1use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25const DEFAULT_REQUEUE_SECONDS: u64 = 120; const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31pub struct TopicControllerContext {
33 pub client: Client,
35 pub cluster_client: ClusterClient,
37 pub metrics: Option<TopicControllerMetrics>,
39}
40
41#[derive(Clone)]
43pub struct TopicControllerMetrics {
44 pub reconciliations: metrics::Counter,
46 pub errors: metrics::Counter,
48 pub duration: metrics::Histogram,
50 pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55 pub fn new() -> Self {
57 Self {
58 reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59 errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60 duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61 topics_total: metrics::gauge!("rivven_topic_managed_total"),
62 }
63 }
64}
65
66impl Default for TopicControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let topics: Api<RivvenTopic> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(TopicControllerContext {
80 client: client.clone(),
81 cluster_client: ClusterClient::new(),
82 metrics: Some(TopicControllerMetrics::new()),
83 });
84
85 info!(
86 namespace = namespace.as_deref().unwrap_or("all"),
87 "Starting RivvenTopic controller"
88 );
89
90 Controller::new(topics.clone(), Config::default())
91 .run(reconcile_topic, topic_error_policy, ctx)
92 .for_each(|result| async move {
93 match result {
94 Ok((obj, action)) => {
95 debug!(
96 name = obj.name,
97 namespace = obj.namespace,
98 ?action,
99 "Topic reconciliation completed"
100 );
101 }
102 Err(e) => {
103 error!(error = %e, "Topic reconciliation failed");
104 }
105 }
106 })
107 .await;
108
109 Ok(())
110}
111
112#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115 topic: Arc<RivvenTopic>,
116 ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118 let start = std::time::Instant::now();
119
120 if let Some(ref metrics) = ctx.metrics {
121 metrics.reconciliations.increment(1);
122 }
123
124 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125 let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127 let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128 match event {
129 FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130 FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131 }
132 })
133 .await;
134
135 if let Some(ref metrics) = ctx.metrics {
136 metrics.duration.record(start.elapsed().as_secs_f64());
137 }
138
139 result.map_err(|e| {
140 if let Some(ref metrics) = ctx.metrics {
141 metrics.errors.increment(1);
142 }
143 OperatorError::ReconcileFailed(e.to_string())
144 })
145}
146
147#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150 let name = topic.name_any();
151 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153 info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155 if let Err(errors) = topic.spec.validate() {
157 let error_messages: Vec<String> = errors
158 .field_errors()
159 .iter()
160 .flat_map(|(field, errs)| {
161 errs.iter()
162 .map(move |e| format!("{}: {:?}", field, e.message))
163 })
164 .collect();
165 let error_msg = error_messages.join("; ");
166 warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168 update_topic_status(
169 &ctx.client,
170 &namespace,
171 &name,
172 build_failed_status(&topic, &error_msg),
173 )
174 .await?;
175
176 return Err(OperatorError::InvalidConfig(error_msg));
177 }
178
179 let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182 let broker_endpoints = get_cluster_endpoints(&cluster);
184
185 if broker_endpoints.is_empty() {
186 warn!(name = %name, "Cluster has no ready broker endpoints");
187 update_topic_status(
188 &ctx.client,
189 &namespace,
190 &name,
191 build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192 )
193 .await?;
194 return Ok(Action::requeue(Duration::from_secs(30)));
195 }
196
197 let topic_result =
199 reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201 match topic_result {
202 Ok(topic_info) => {
203 let status = build_ready_status(&topic, topic_info);
205 update_topic_status(&ctx.client, &namespace, &name, status).await?;
206 if let Some(ref metrics) = ctx.metrics {
207 metrics.topics_total.increment(1.0);
208 }
209 info!(name = %name, "Topic reconciliation complete");
210 Ok(Action::requeue(Duration::from_secs(
211 DEFAULT_REQUEUE_SECONDS,
212 )))
213 }
214 Err(e) => {
215 let status = build_error_status(&topic, &e.to_string());
217 update_topic_status(&ctx.client, &namespace, &name, status).await?;
218 warn!(name = %name, error = %e, "Topic reconciliation failed");
219 Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
220 }
221 }
222}
223
224async fn verify_cluster_ref(
226 client: &Client,
227 namespace: &str,
228 cluster_ref: &ClusterReference,
229) -> Result<crate::crd::RivvenCluster> {
230 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
231 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
232
233 match clusters.get(&cluster_ref.name).await {
234 Ok(cluster) => Ok(cluster),
235 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
236 format!("{}/{}", cluster_ns, cluster_ref.name),
237 )),
238 Err(e) => Err(OperatorError::from(e)),
239 }
240}
241
242fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
244 cluster
245 .status
246 .as_ref()
247 .map(|s| s.broker_endpoints.clone())
248 .unwrap_or_default()
249}
250
251struct TopicInfo {
253 partitions: i32,
255 replication_factor: i32,
257 existed: bool,
259 partition_info: Vec<PartitionInfo>,
261}
262
263async fn reconcile_topic_with_cluster(
267 topic: &RivvenTopic,
268 broker_endpoints: &[String],
269 cluster_client: &ClusterClient,
270) -> Result<TopicInfo> {
271 let topic_name = topic.name_any();
272 let spec = &topic.spec;
273
274 info!(
275 topic = %topic_name,
276 partitions = spec.partitions,
277 replication = spec.replication_factor,
278 brokers = ?broker_endpoints,
279 "Reconciling topic with cluster"
280 );
281
282 let cluster_topic_info = cluster_client
284 .ensure_topic(broker_endpoints, &topic_name, spec.partitions as u32)
285 .await?;
286
287 let partition_info: Vec<PartitionInfo> = (0..cluster_topic_info.partitions as i32)
290 .map(|i| PartitionInfo {
291 partition: i,
292 leader: (i % 3), replicas: (0..spec.replication_factor).collect(),
294 isr: (0..spec.replication_factor).collect(),
295 })
296 .collect();
297
298 info!(
299 topic = %topic_name,
300 partitions = cluster_topic_info.partitions,
301 existed = cluster_topic_info.existed,
302 "Topic reconciliation successful"
303 );
304
305 Ok(TopicInfo {
306 partitions: cluster_topic_info.partitions as i32,
307 replication_factor: spec.replication_factor,
308 existed: cluster_topic_info.existed,
309 partition_info,
310 })
311}
312
313fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
315 let now = Utc::now().to_rfc3339();
316
317 let conditions = vec![
318 TopicCondition {
319 r#type: "Ready".to_string(),
320 status: "True".to_string(),
321 reason: "TopicReady".to_string(),
322 message: "Topic is ready and serving traffic".to_string(),
323 last_transition_time: now.clone(),
324 },
325 TopicCondition {
326 r#type: "Synced".to_string(),
327 status: "True".to_string(),
328 reason: "SyncSucceeded".to_string(),
329 message: if info.existed {
330 "Topic configuration synchronized".to_string()
331 } else {
332 "Topic created successfully".to_string()
333 },
334 last_transition_time: now.clone(),
335 },
336 TopicCondition {
337 r#type: "ConfigApplied".to_string(),
338 status: "True".to_string(),
339 reason: "ConfigApplied".to_string(),
340 message: "Topic configuration applied".to_string(),
341 last_transition_time: now.clone(),
342 },
343 TopicCondition {
344 r#type: "ACLsApplied".to_string(),
345 status: if topic.spec.acls.is_empty() {
346 "N/A".to_string()
347 } else {
348 "True".to_string()
349 },
350 reason: if topic.spec.acls.is_empty() {
351 "NoACLs".to_string()
352 } else {
353 "ACLsApplied".to_string()
354 },
355 message: if topic.spec.acls.is_empty() {
356 "No ACLs configured".to_string()
357 } else {
358 format!("{} ACL entries applied", topic.spec.acls.len())
359 },
360 last_transition_time: now.clone(),
361 },
362 ];
363
364 RivvenTopicStatus {
365 phase: "Ready".to_string(),
366 message: "Topic is ready".to_string(),
367 current_partitions: info.partitions,
368 current_replication_factor: info.replication_factor,
369 topic_exists: true,
370 observed_generation: topic.metadata.generation.unwrap_or(0),
371 conditions,
372 last_sync_time: Some(now),
373 partition_info: info.partition_info,
374 }
375}
376
377fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
379 let now = Utc::now().to_rfc3339();
380
381 RivvenTopicStatus {
382 phase: "Pending".to_string(),
383 message: message.to_string(),
384 current_partitions: 0,
385 current_replication_factor: 0,
386 topic_exists: false,
387 observed_generation: topic.metadata.generation.unwrap_or(0),
388 conditions: vec![TopicCondition {
389 r#type: "Ready".to_string(),
390 status: "False".to_string(),
391 reason: "Pending".to_string(),
392 message: message.to_string(),
393 last_transition_time: now.clone(),
394 }],
395 last_sync_time: Some(now),
396 partition_info: vec![],
397 }
398}
399
400fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
402 let now = Utc::now().to_rfc3339();
403
404 RivvenTopicStatus {
405 phase: "Failed".to_string(),
406 message: error_msg.to_string(),
407 current_partitions: 0,
408 current_replication_factor: 0,
409 topic_exists: false,
410 observed_generation: topic.metadata.generation.unwrap_or(0),
411 conditions: vec![TopicCondition {
412 r#type: "Ready".to_string(),
413 status: "False".to_string(),
414 reason: "ValidationFailed".to_string(),
415 message: error_msg.to_string(),
416 last_transition_time: now.clone(),
417 }],
418 last_sync_time: Some(now),
419 partition_info: vec![],
420 }
421}
422
423fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
425 let now = Utc::now().to_rfc3339();
426
427 let existing_status = topic.status.clone().unwrap_or_default();
429
430 RivvenTopicStatus {
431 phase: "Error".to_string(),
432 message: error_msg.to_string(),
433 current_partitions: existing_status.current_partitions,
434 current_replication_factor: existing_status.current_replication_factor,
435 topic_exists: existing_status.topic_exists,
436 observed_generation: topic.metadata.generation.unwrap_or(0),
437 conditions: vec![
438 TopicCondition {
439 r#type: "Ready".to_string(),
440 status: if existing_status.topic_exists {
441 "True".to_string()
442 } else {
443 "False".to_string()
444 },
445 reason: "ReconcileError".to_string(),
446 message: error_msg.to_string(),
447 last_transition_time: now.clone(),
448 },
449 TopicCondition {
450 r#type: "Synced".to_string(),
451 status: "False".to_string(),
452 reason: "SyncFailed".to_string(),
453 message: error_msg.to_string(),
454 last_transition_time: now.clone(),
455 },
456 ],
457 last_sync_time: Some(now),
458 partition_info: existing_status.partition_info,
459 }
460}
461
462async fn update_topic_status(
464 client: &Client,
465 namespace: &str,
466 name: &str,
467 status: RivvenTopicStatus,
468) -> Result<()> {
469 let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
470
471 debug!(name = %name, phase = %status.phase, "Updating topic status");
472
473 let patch = serde_json::json!({
474 "status": status
475 });
476
477 let patch_params = PatchParams::default();
478 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
479 .await
480 .map_err(OperatorError::from)?;
481
482 Ok(())
483}
484
485#[instrument(skip(topic, ctx))]
487async fn cleanup_topic(
488 topic: Arc<RivvenTopic>,
489 ctx: Arc<TopicControllerContext>,
490) -> Result<Action> {
491 let name = topic.name_any();
492 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
493
494 info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
495
496 if topic.spec.delete_on_remove {
498 info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
499
500 match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
502 Ok(cluster) => {
503 let endpoints = get_cluster_endpoints(&cluster);
504 if !endpoints.is_empty() {
505 if let Err(e) =
507 delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
508 {
509 warn!(
510 name = %name,
511 error = %e,
512 "Failed to delete topic from cluster (may not exist)"
513 );
514 }
515 }
516 }
517 Err(e) => {
518 warn!(
519 name = %name,
520 error = %e,
521 "Cluster not found during cleanup, topic may be orphaned"
522 );
523 }
524 }
525 } else {
526 info!(
527 name = %name,
528 "delete_on_remove is false, topic will remain in cluster"
529 );
530 }
531
532 info!(name = %name, "Topic cleanup complete");
533
534 if let Some(ref metrics) = ctx.metrics {
535 metrics.topics_total.decrement(1.0);
536 }
537
538 Ok(Action::await_change())
539}
540
541async fn delete_topic_from_cluster(
543 topic_name: &str,
544 broker_endpoints: &[String],
545 cluster_client: &ClusterClient,
546) -> Result<()> {
547 info!(topic = %topic_name, "Deleting topic from cluster");
548 cluster_client
549 .delete_topic(broker_endpoints, topic_name)
550 .await
551}
552
553fn topic_error_policy(
555 _topic: Arc<RivvenTopic>,
556 error: &OperatorError,
557 _ctx: Arc<TopicControllerContext>,
558) -> Action {
559 warn!(
560 error = %error,
561 "Topic reconciliation error, will retry"
562 );
563
564 let delay = error
565 .requeue_delay()
566 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
567
568 Action::requeue(delay)
569}
570
571#[cfg(test)]
572mod tests {
573 use super::*;
574 use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
575 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
576 use std::collections::BTreeMap;
577
578 fn create_test_topic() -> RivvenTopic {
579 RivvenTopic {
580 metadata: ObjectMeta {
581 name: Some("test-topic".to_string()),
582 namespace: Some("default".to_string()),
583 uid: Some("test-uid".to_string()),
584 generation: Some(1),
585 ..Default::default()
586 },
587 spec: RivvenTopicSpec {
588 cluster_ref: ClusterReference {
589 name: "test-cluster".to_string(),
590 namespace: None,
591 },
592 partitions: 6,
593 replication_factor: 3,
594 config: TopicConfig::default(),
595 acls: vec![],
596 delete_on_remove: true,
597 topic_labels: BTreeMap::new(),
598 },
599 status: None,
600 }
601 }
602
603 fn create_test_topic_with_acls() -> RivvenTopic {
604 let mut topic = create_test_topic();
605 topic.spec.acls = vec![
606 TopicAcl {
607 principal: "user:app1".to_string(),
608 operations: vec!["Read".to_string(), "Write".to_string()],
609 permission_type: "Allow".to_string(),
610 host: "*".to_string(),
611 },
612 TopicAcl {
613 principal: "user:analytics".to_string(),
614 operations: vec!["Read".to_string()],
615 permission_type: "Allow".to_string(),
616 host: "*".to_string(),
617 },
618 ];
619 topic
620 }
621
622 #[test]
623 fn test_build_ready_status() {
624 let topic = create_test_topic();
625 let info = TopicInfo {
626 partitions: 6,
627 replication_factor: 3,
628 existed: false,
629 partition_info: vec![PartitionInfo {
630 partition: 0,
631 leader: 0,
632 replicas: vec![0, 1, 2],
633 isr: vec![0, 1, 2],
634 }],
635 };
636
637 let status = build_ready_status(&topic, info);
638
639 assert_eq!(status.phase, "Ready");
640 assert_eq!(status.current_partitions, 6);
641 assert_eq!(status.current_replication_factor, 3);
642 assert!(status.topic_exists);
643 assert_eq!(status.conditions.len(), 4);
644
645 let ready_cond = status
646 .conditions
647 .iter()
648 .find(|c| c.r#type == "Ready")
649 .unwrap();
650 assert_eq!(ready_cond.status, "True");
651 }
652
653 #[test]
654 fn test_build_ready_status_with_acls() {
655 let topic = create_test_topic_with_acls();
656 let info = TopicInfo {
657 partitions: 6,
658 replication_factor: 3,
659 existed: true,
660 partition_info: vec![],
661 };
662
663 let status = build_ready_status(&topic, info);
664
665 let acl_cond = status
666 .conditions
667 .iter()
668 .find(|c| c.r#type == "ACLsApplied")
669 .unwrap();
670 assert_eq!(acl_cond.status, "True");
671 assert!(acl_cond.message.contains("2 ACL entries"));
672 }
673
674 #[test]
675 fn test_build_pending_status() {
676 let topic = create_test_topic();
677 let status = build_pending_status(&topic, "Waiting for cluster");
678
679 assert_eq!(status.phase, "Pending");
680 assert_eq!(status.message, "Waiting for cluster");
681 assert!(!status.topic_exists);
682 assert_eq!(status.current_partitions, 0);
683 }
684
685 #[test]
686 fn test_build_failed_status() {
687 let topic = create_test_topic();
688 let status = build_failed_status(&topic, "Validation error");
689
690 assert_eq!(status.phase, "Failed");
691 assert!(!status.topic_exists);
692
693 let ready_cond = status
694 .conditions
695 .iter()
696 .find(|c| c.r#type == "Ready")
697 .unwrap();
698 assert_eq!(ready_cond.status, "False");
699 assert_eq!(ready_cond.reason, "ValidationFailed");
700 }
701
702 #[test]
703 fn test_build_error_status_preserves_existing() {
704 let mut topic = create_test_topic();
705 topic.status = Some(RivvenTopicStatus {
706 phase: "Ready".to_string(),
707 message: "Was ready".to_string(),
708 current_partitions: 6,
709 current_replication_factor: 3,
710 topic_exists: true,
711 observed_generation: 1,
712 conditions: vec![],
713 last_sync_time: None,
714 partition_info: vec![PartitionInfo {
715 partition: 0,
716 leader: 0,
717 replicas: vec![0, 1, 2],
718 isr: vec![0, 1, 2],
719 }],
720 });
721
722 let status = build_error_status(&topic, "Sync failed");
723
724 assert_eq!(status.phase, "Error");
725 assert!(status.topic_exists);
727 assert_eq!(status.current_partitions, 6);
728 assert_eq!(status.partition_info.len(), 1);
729 }
730
731 #[test]
732 fn test_partition_info_generation() {
733 let topic = create_test_topic();
734
735 let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
737 .map(|i| PartitionInfo {
738 partition: i,
739 leader: (i % 3),
740 replicas: (0..topic.spec.replication_factor).collect(),
741 isr: (0..topic.spec.replication_factor).collect(),
742 })
743 .collect();
744
745 assert_eq!(partition_info.len(), 6);
746 assert_eq!(partition_info[0].partition, 0);
747 assert_eq!(partition_info[0].leader, 0);
748 assert_eq!(partition_info[3].leader, 0); assert_eq!(partition_info[4].leader, 1); }
751
752 #[test]
753 fn test_conditions_structure() {
754 let topic = create_test_topic();
755 let info = TopicInfo {
756 partitions: 6,
757 replication_factor: 3,
758 existed: false,
759 partition_info: vec![],
760 };
761
762 let status = build_ready_status(&topic, info);
763
764 let condition_types: Vec<&str> = status
766 .conditions
767 .iter()
768 .map(|c| c.r#type.as_str())
769 .collect();
770
771 assert!(condition_types.contains(&"Ready"));
772 assert!(condition_types.contains(&"Synced"));
773 assert!(condition_types.contains(&"ConfigApplied"));
774 assert!(condition_types.contains(&"ACLsApplied"));
775 }
776
777 #[test]
778 fn test_no_acls_condition() {
779 let topic = create_test_topic(); let info = TopicInfo {
781 partitions: 6,
782 replication_factor: 3,
783 existed: false,
784 partition_info: vec![],
785 };
786
787 let status = build_ready_status(&topic, info);
788
789 let acl_cond = status
790 .conditions
791 .iter()
792 .find(|c| c.r#type == "ACLsApplied")
793 .unwrap();
794 assert_eq!(acl_cond.status, "N/A");
795 assert_eq!(acl_cond.reason, "NoACLs");
796 }
797}