1use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
8use crate::error::{OperatorError, Result};
9use chrono::Utc;
10use futures::StreamExt;
11use kube::api::{Api, Patch, PatchParams};
12use kube::runtime::controller::{Action, Controller};
13use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
14use kube::runtime::watcher::Config;
15use kube::{Client, ResourceExt};
16use std::sync::Arc;
17use std::time::Duration;
18use tracing::{debug, error, info, instrument, warn};
19use validator::Validate;
20
21pub const TOPIC_FINALIZER: &str = "rivven.io/topic-finalizer";
23
24const DEFAULT_REQUEUE_SECONDS: u64 = 120; const ERROR_REQUEUE_SECONDS: u64 = 30;
29
30pub struct TopicControllerContext {
32 pub client: Client,
34 pub metrics: Option<TopicControllerMetrics>,
36}
37
38#[derive(Clone)]
40pub struct TopicControllerMetrics {
41 pub reconciliations: metrics::Counter,
43 pub errors: metrics::Counter,
45 pub duration: metrics::Histogram,
47 pub topics_total: metrics::Gauge,
49}
50
51impl TopicControllerMetrics {
52 pub fn new() -> Self {
54 Self {
55 reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
56 errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
57 duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
58 topics_total: metrics::gauge!("rivven_topic_managed_total"),
59 }
60 }
61}
62
63impl Default for TopicControllerMetrics {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
71 let topics: Api<RivvenTopic> = match &namespace {
72 Some(ns) => Api::namespaced(client.clone(), ns),
73 None => Api::all(client.clone()),
74 };
75
76 let ctx = Arc::new(TopicControllerContext {
77 client: client.clone(),
78 metrics: Some(TopicControllerMetrics::new()),
79 });
80
81 info!(
82 namespace = namespace.as_deref().unwrap_or("all"),
83 "Starting RivvenTopic controller"
84 );
85
86 Controller::new(topics.clone(), Config::default())
87 .run(reconcile_topic, topic_error_policy, ctx)
88 .for_each(|result| async move {
89 match result {
90 Ok((obj, action)) => {
91 debug!(
92 name = obj.name,
93 namespace = obj.namespace,
94 ?action,
95 "Topic reconciliation completed"
96 );
97 }
98 Err(e) => {
99 error!(error = %e, "Topic reconciliation failed");
100 }
101 }
102 })
103 .await;
104
105 Ok(())
106}
107
108#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
110async fn reconcile_topic(
111 topic: Arc<RivvenTopic>,
112 ctx: Arc<TopicControllerContext>,
113) -> Result<Action> {
114 let start = std::time::Instant::now();
115
116 if let Some(ref metrics) = ctx.metrics {
117 metrics.reconciliations.increment(1);
118 }
119
120 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
121 let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
122
123 let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
124 match event {
125 FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
126 FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
127 }
128 })
129 .await;
130
131 if let Some(ref metrics) = ctx.metrics {
132 metrics.duration.record(start.elapsed().as_secs_f64());
133 }
134
135 result.map_err(|e| {
136 if let Some(ref metrics) = ctx.metrics {
137 metrics.errors.increment(1);
138 }
139 OperatorError::ReconcileFailed(e.to_string())
140 })
141}
142
143#[instrument(skip(topic, ctx))]
145async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
146 let name = topic.name_any();
147 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
148
149 info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
150
151 if let Err(errors) = topic.spec.validate() {
153 let error_messages: Vec<String> = errors
154 .field_errors()
155 .iter()
156 .flat_map(|(field, errs)| {
157 errs.iter()
158 .map(move |e| format!("{}: {:?}", field, e.message))
159 })
160 .collect();
161 let error_msg = error_messages.join("; ");
162 warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
163
164 update_topic_status(
165 &ctx.client,
166 &namespace,
167 &name,
168 build_failed_status(&topic, &error_msg),
169 )
170 .await?;
171
172 return Err(OperatorError::InvalidConfig(error_msg));
173 }
174
175 let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
177
178 let broker_endpoints = get_cluster_endpoints(&cluster);
180
181 if broker_endpoints.is_empty() {
182 warn!(name = %name, "Cluster has no ready broker endpoints");
183 update_topic_status(
184 &ctx.client,
185 &namespace,
186 &name,
187 build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
188 )
189 .await?;
190 return Ok(Action::requeue(Duration::from_secs(30)));
191 }
192
193 let topic_result = reconcile_topic_with_cluster(&topic, &broker_endpoints).await;
195
196 match topic_result {
197 Ok(topic_info) => {
198 let status = build_ready_status(&topic, topic_info);
200 update_topic_status(&ctx.client, &namespace, &name, status).await?;
201 info!(name = %name, "Topic reconciliation complete");
202 Ok(Action::requeue(Duration::from_secs(
203 DEFAULT_REQUEUE_SECONDS,
204 )))
205 }
206 Err(e) => {
207 let status = build_error_status(&topic, &e.to_string());
209 update_topic_status(&ctx.client, &namespace, &name, status).await?;
210 warn!(name = %name, error = %e, "Topic reconciliation failed");
211 Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
212 }
213 }
214}
215
216async fn verify_cluster_ref(
218 client: &Client,
219 namespace: &str,
220 cluster_ref: &ClusterReference,
221) -> Result<crate::crd::RivvenCluster> {
222 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
223 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
224
225 match clusters.get(&cluster_ref.name).await {
226 Ok(cluster) => Ok(cluster),
227 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
228 format!("{}/{}", cluster_ns, cluster_ref.name),
229 )),
230 Err(e) => Err(OperatorError::from(e)),
231 }
232}
233
234fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
236 cluster
237 .status
238 .as_ref()
239 .map(|s| s.broker_endpoints.clone())
240 .unwrap_or_default()
241}
242
243struct TopicInfo {
245 partitions: i32,
247 replication_factor: i32,
249 existed: bool,
251 partition_info: Vec<PartitionInfo>,
253}
254
255async fn reconcile_topic_with_cluster(
259 topic: &RivvenTopic,
260 broker_endpoints: &[String],
261) -> Result<TopicInfo> {
262 let topic_name = topic.name_any();
263 let spec = &topic.spec;
264
265 info!(
276 topic = %topic_name,
277 partitions = spec.partitions,
278 replication = spec.replication_factor,
279 brokers = ?broker_endpoints,
280 "Reconciling topic with cluster"
281 );
282
283 let partition_info: Vec<PartitionInfo> = (0..spec.partitions)
285 .map(|i| PartitionInfo {
286 partition: i,
287 leader: (i % 3), replicas: (0..spec.replication_factor).collect(),
289 isr: (0..spec.replication_factor).collect(),
290 })
291 .collect();
292
293 Ok(TopicInfo {
294 partitions: spec.partitions,
295 replication_factor: spec.replication_factor,
296 existed: false, partition_info,
298 })
299}
300
301fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
303 let now = Utc::now().to_rfc3339();
304
305 let conditions = vec![
306 TopicCondition {
307 r#type: "Ready".to_string(),
308 status: "True".to_string(),
309 reason: "TopicReady".to_string(),
310 message: "Topic is ready and serving traffic".to_string(),
311 last_transition_time: now.clone(),
312 },
313 TopicCondition {
314 r#type: "Synced".to_string(),
315 status: "True".to_string(),
316 reason: "SyncSucceeded".to_string(),
317 message: if info.existed {
318 "Topic configuration synchronized".to_string()
319 } else {
320 "Topic created successfully".to_string()
321 },
322 last_transition_time: now.clone(),
323 },
324 TopicCondition {
325 r#type: "ConfigApplied".to_string(),
326 status: "True".to_string(),
327 reason: "ConfigApplied".to_string(),
328 message: "Topic configuration applied".to_string(),
329 last_transition_time: now.clone(),
330 },
331 TopicCondition {
332 r#type: "ACLsApplied".to_string(),
333 status: if topic.spec.acls.is_empty() {
334 "N/A".to_string()
335 } else {
336 "True".to_string()
337 },
338 reason: if topic.spec.acls.is_empty() {
339 "NoACLs".to_string()
340 } else {
341 "ACLsApplied".to_string()
342 },
343 message: if topic.spec.acls.is_empty() {
344 "No ACLs configured".to_string()
345 } else {
346 format!("{} ACL entries applied", topic.spec.acls.len())
347 },
348 last_transition_time: now.clone(),
349 },
350 ];
351
352 RivvenTopicStatus {
353 phase: "Ready".to_string(),
354 message: "Topic is ready".to_string(),
355 current_partitions: info.partitions,
356 current_replication_factor: info.replication_factor,
357 topic_exists: true,
358 observed_generation: topic.metadata.generation.unwrap_or(0),
359 conditions,
360 last_sync_time: Some(now),
361 partition_info: info.partition_info,
362 }
363}
364
365fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
367 let now = Utc::now().to_rfc3339();
368
369 RivvenTopicStatus {
370 phase: "Pending".to_string(),
371 message: message.to_string(),
372 current_partitions: 0,
373 current_replication_factor: 0,
374 topic_exists: false,
375 observed_generation: topic.metadata.generation.unwrap_or(0),
376 conditions: vec![TopicCondition {
377 r#type: "Ready".to_string(),
378 status: "False".to_string(),
379 reason: "Pending".to_string(),
380 message: message.to_string(),
381 last_transition_time: now.clone(),
382 }],
383 last_sync_time: Some(now),
384 partition_info: vec![],
385 }
386}
387
388fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
390 let now = Utc::now().to_rfc3339();
391
392 RivvenTopicStatus {
393 phase: "Failed".to_string(),
394 message: error_msg.to_string(),
395 current_partitions: 0,
396 current_replication_factor: 0,
397 topic_exists: false,
398 observed_generation: topic.metadata.generation.unwrap_or(0),
399 conditions: vec![TopicCondition {
400 r#type: "Ready".to_string(),
401 status: "False".to_string(),
402 reason: "ValidationFailed".to_string(),
403 message: error_msg.to_string(),
404 last_transition_time: now.clone(),
405 }],
406 last_sync_time: Some(now),
407 partition_info: vec![],
408 }
409}
410
411fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
413 let now = Utc::now().to_rfc3339();
414
415 let existing_status = topic.status.clone().unwrap_or_default();
417
418 RivvenTopicStatus {
419 phase: "Error".to_string(),
420 message: error_msg.to_string(),
421 current_partitions: existing_status.current_partitions,
422 current_replication_factor: existing_status.current_replication_factor,
423 topic_exists: existing_status.topic_exists,
424 observed_generation: topic.metadata.generation.unwrap_or(0),
425 conditions: vec![
426 TopicCondition {
427 r#type: "Ready".to_string(),
428 status: if existing_status.topic_exists {
429 "True".to_string()
430 } else {
431 "False".to_string()
432 },
433 reason: "ReconcileError".to_string(),
434 message: error_msg.to_string(),
435 last_transition_time: now.clone(),
436 },
437 TopicCondition {
438 r#type: "Synced".to_string(),
439 status: "False".to_string(),
440 reason: "SyncFailed".to_string(),
441 message: error_msg.to_string(),
442 last_transition_time: now.clone(),
443 },
444 ],
445 last_sync_time: Some(now),
446 partition_info: existing_status.partition_info,
447 }
448}
449
450async fn update_topic_status(
452 client: &Client,
453 namespace: &str,
454 name: &str,
455 status: RivvenTopicStatus,
456) -> Result<()> {
457 let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
458
459 debug!(name = %name, phase = %status.phase, "Updating topic status");
460
461 let patch = serde_json::json!({
462 "status": status
463 });
464
465 let patch_params = PatchParams::default();
466 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
467 .await
468 .map_err(OperatorError::from)?;
469
470 Ok(())
471}
472
473#[instrument(skip(topic, ctx))]
475async fn cleanup_topic(
476 topic: Arc<RivvenTopic>,
477 ctx: Arc<TopicControllerContext>,
478) -> Result<Action> {
479 let name = topic.name_any();
480 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
481
482 info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
483
484 if topic.spec.delete_on_remove {
486 info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
487
488 match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
490 Ok(cluster) => {
491 let endpoints = get_cluster_endpoints(&cluster);
492 if !endpoints.is_empty() {
493 if let Err(e) = delete_topic_from_cluster(&name, &endpoints).await {
495 warn!(
496 name = %name,
497 error = %e,
498 "Failed to delete topic from cluster (may not exist)"
499 );
500 }
501 }
502 }
503 Err(e) => {
504 warn!(
505 name = %name,
506 error = %e,
507 "Cluster not found during cleanup, topic may be orphaned"
508 );
509 }
510 }
511 } else {
512 info!(
513 name = %name,
514 "delete_on_remove is false, topic will remain in cluster"
515 );
516 }
517
518 info!(name = %name, "Topic cleanup complete");
519
520 Ok(Action::await_change())
521}
522
523async fn delete_topic_from_cluster(topic_name: &str, _broker_endpoints: &[String]) -> Result<()> {
525 info!(topic = %topic_name, "Would delete topic from cluster");
527 Ok(())
528}
529
530fn topic_error_policy(
532 _topic: Arc<RivvenTopic>,
533 error: &OperatorError,
534 _ctx: Arc<TopicControllerContext>,
535) -> Action {
536 warn!(
537 error = %error,
538 "Topic reconciliation error, will retry"
539 );
540
541 let delay = error
542 .requeue_delay()
543 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
544
545 Action::requeue(delay)
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
552 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
553 use std::collections::BTreeMap;
554
555 fn create_test_topic() -> RivvenTopic {
556 RivvenTopic {
557 metadata: ObjectMeta {
558 name: Some("test-topic".to_string()),
559 namespace: Some("default".to_string()),
560 uid: Some("test-uid".to_string()),
561 generation: Some(1),
562 ..Default::default()
563 },
564 spec: RivvenTopicSpec {
565 cluster_ref: ClusterReference {
566 name: "test-cluster".to_string(),
567 namespace: None,
568 },
569 partitions: 6,
570 replication_factor: 3,
571 config: TopicConfig::default(),
572 acls: vec![],
573 delete_on_remove: true,
574 topic_labels: BTreeMap::new(),
575 },
576 status: None,
577 }
578 }
579
580 fn create_test_topic_with_acls() -> RivvenTopic {
581 let mut topic = create_test_topic();
582 topic.spec.acls = vec![
583 TopicAcl {
584 principal: "user:app1".to_string(),
585 operations: vec!["Read".to_string(), "Write".to_string()],
586 permission_type: "Allow".to_string(),
587 host: "*".to_string(),
588 },
589 TopicAcl {
590 principal: "user:analytics".to_string(),
591 operations: vec!["Read".to_string()],
592 permission_type: "Allow".to_string(),
593 host: "*".to_string(),
594 },
595 ];
596 topic
597 }
598
599 #[test]
600 fn test_build_ready_status() {
601 let topic = create_test_topic();
602 let info = TopicInfo {
603 partitions: 6,
604 replication_factor: 3,
605 existed: false,
606 partition_info: vec![PartitionInfo {
607 partition: 0,
608 leader: 0,
609 replicas: vec![0, 1, 2],
610 isr: vec![0, 1, 2],
611 }],
612 };
613
614 let status = build_ready_status(&topic, info);
615
616 assert_eq!(status.phase, "Ready");
617 assert_eq!(status.current_partitions, 6);
618 assert_eq!(status.current_replication_factor, 3);
619 assert!(status.topic_exists);
620 assert_eq!(status.conditions.len(), 4);
621
622 let ready_cond = status
623 .conditions
624 .iter()
625 .find(|c| c.r#type == "Ready")
626 .unwrap();
627 assert_eq!(ready_cond.status, "True");
628 }
629
630 #[test]
631 fn test_build_ready_status_with_acls() {
632 let topic = create_test_topic_with_acls();
633 let info = TopicInfo {
634 partitions: 6,
635 replication_factor: 3,
636 existed: true,
637 partition_info: vec![],
638 };
639
640 let status = build_ready_status(&topic, info);
641
642 let acl_cond = status
643 .conditions
644 .iter()
645 .find(|c| c.r#type == "ACLsApplied")
646 .unwrap();
647 assert_eq!(acl_cond.status, "True");
648 assert!(acl_cond.message.contains("2 ACL entries"));
649 }
650
651 #[test]
652 fn test_build_pending_status() {
653 let topic = create_test_topic();
654 let status = build_pending_status(&topic, "Waiting for cluster");
655
656 assert_eq!(status.phase, "Pending");
657 assert_eq!(status.message, "Waiting for cluster");
658 assert!(!status.topic_exists);
659 assert_eq!(status.current_partitions, 0);
660 }
661
662 #[test]
663 fn test_build_failed_status() {
664 let topic = create_test_topic();
665 let status = build_failed_status(&topic, "Validation error");
666
667 assert_eq!(status.phase, "Failed");
668 assert!(!status.topic_exists);
669
670 let ready_cond = status
671 .conditions
672 .iter()
673 .find(|c| c.r#type == "Ready")
674 .unwrap();
675 assert_eq!(ready_cond.status, "False");
676 assert_eq!(ready_cond.reason, "ValidationFailed");
677 }
678
679 #[test]
680 fn test_build_error_status_preserves_existing() {
681 let mut topic = create_test_topic();
682 topic.status = Some(RivvenTopicStatus {
683 phase: "Ready".to_string(),
684 message: "Was ready".to_string(),
685 current_partitions: 6,
686 current_replication_factor: 3,
687 topic_exists: true,
688 observed_generation: 1,
689 conditions: vec![],
690 last_sync_time: None,
691 partition_info: vec![PartitionInfo {
692 partition: 0,
693 leader: 0,
694 replicas: vec![0, 1, 2],
695 isr: vec![0, 1, 2],
696 }],
697 });
698
699 let status = build_error_status(&topic, "Sync failed");
700
701 assert_eq!(status.phase, "Error");
702 assert!(status.topic_exists);
704 assert_eq!(status.current_partitions, 6);
705 assert_eq!(status.partition_info.len(), 1);
706 }
707
708 #[test]
709 fn test_partition_info_generation() {
710 let topic = create_test_topic();
711
712 let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
714 .map(|i| PartitionInfo {
715 partition: i,
716 leader: (i % 3),
717 replicas: (0..topic.spec.replication_factor).collect(),
718 isr: (0..topic.spec.replication_factor).collect(),
719 })
720 .collect();
721
722 assert_eq!(partition_info.len(), 6);
723 assert_eq!(partition_info[0].partition, 0);
724 assert_eq!(partition_info[0].leader, 0);
725 assert_eq!(partition_info[3].leader, 0); assert_eq!(partition_info[4].leader, 1); }
728
729 #[test]
730 fn test_conditions_structure() {
731 let topic = create_test_topic();
732 let info = TopicInfo {
733 partitions: 6,
734 replication_factor: 3,
735 existed: false,
736 partition_info: vec![],
737 };
738
739 let status = build_ready_status(&topic, info);
740
741 let condition_types: Vec<&str> = status
743 .conditions
744 .iter()
745 .map(|c| c.r#type.as_str())
746 .collect();
747
748 assert!(condition_types.contains(&"Ready"));
749 assert!(condition_types.contains(&"Synced"));
750 assert!(condition_types.contains(&"ConfigApplied"));
751 assert!(condition_types.contains(&"ACLsApplied"));
752 }
753
754 #[test]
755 fn test_no_acls_condition() {
756 let topic = create_test_topic(); let info = TopicInfo {
758 partitions: 6,
759 replication_factor: 3,
760 existed: false,
761 partition_info: vec![],
762 };
763
764 let status = build_ready_status(&topic, info);
765
766 let acl_cond = status
767 .conditions
768 .iter()
769 .find(|c| c.r#type == "ACLsApplied")
770 .unwrap();
771 assert_eq!(acl_cond.status, "N/A");
772 assert_eq!(acl_cond.reason, "NoACLs");
773 }
774}