1use crate::crd::{
7 ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8 RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27pub const CONNECT_FINALIZER: &str = "rivven.hupe1980.github.io/connect-finalizer";
29
30const DEFAULT_REQUEUE_SECONDS: u64 = 60; const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36pub struct ConnectControllerContext {
38 pub client: Client,
40 pub metrics: Option<ConnectControllerMetrics>,
42}
43
44#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47 pub reconciliations: metrics::Counter,
49 pub errors: metrics::Counter,
51 pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56 pub fn new() -> Self {
58 Self {
59 reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60 errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61 duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62 }
63 }
64}
65
66impl Default for ConnectControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let connects: Api<RivvenConnect> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(ConnectControllerContext {
80 client: client.clone(),
81 metrics: Some(ConnectControllerMetrics::new()),
82 });
83
84 info!(
85 namespace = namespace.as_deref().unwrap_or("all"),
86 "Starting RivvenConnect controller"
87 );
88
89 let deployments = match &namespace {
91 Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92 None => Api::<Deployment>::all(client.clone()),
93 };
94
95 let configmaps = match &namespace {
96 Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97 None => Api::<ConfigMap>::all(client.clone()),
98 };
99
100 Controller::new(connects.clone(), Config::default())
101 .owns(deployments, Config::default())
102 .owns(configmaps, Config::default())
103 .run(reconcile_connect, connect_error_policy, ctx)
104 .for_each(|result| async move {
105 match result {
106 Ok((obj, action)) => {
107 debug!(
108 name = obj.name,
109 namespace = obj.namespace,
110 ?action,
111 "Connect reconciliation completed"
112 );
113 }
114 Err(e) => {
115 error!(error = %e, "Connect reconciliation failed");
116 }
117 }
118 })
119 .await;
120
121 Ok(())
122}
123
124#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127 connect: Arc<RivvenConnect>,
128 ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130 let start = std::time::Instant::now();
131
132 if let Some(ref metrics) = ctx.metrics {
133 metrics.reconciliations.increment(1);
134 }
135
136 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137 let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139 let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140 match event {
141 FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142 FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143 }
144 })
145 .await;
146
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.duration.record(start.elapsed().as_secs_f64());
149 }
150
151 result.map_err(|e| {
152 if let Some(ref metrics) = ctx.metrics {
153 metrics.errors.increment(1);
154 }
155 OperatorError::ReconcileFailed(e.to_string())
156 })
157}
158
159#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162 connect: Arc<RivvenConnect>,
163 ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165 let name = connect.name_any();
166 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168 info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170 if let Err(errors) = connect.spec.validate() {
172 let error_messages: Vec<String> = errors
173 .field_errors()
174 .iter()
175 .flat_map(|(field, errs)| {
176 errs.iter()
177 .map(move |e| format!("{}: {:?}", field, e.message))
178 })
179 .collect();
180 let error_msg = error_messages.join("; ");
181 warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183 update_connect_status(
185 &ctx.client,
186 &namespace,
187 &name,
188 build_failed_status(&connect, &error_msg),
189 )
190 .await?;
191
192 return Err(OperatorError::InvalidConfig(error_msg));
193 }
194
195 verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198 let configmap = build_connect_configmap(&connect)?;
200 apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202 let deployment = build_connect_deployment(&connect)?;
204 let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206 let service = build_connect_service(&connect);
208 apply_connect_service(&ctx.client, &namespace, service).await?;
209
210 let status = build_connect_status(&connect, deploy_status);
212 update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214 info!(name = %name, "Connect reconciliation complete");
215
216 Ok(Action::requeue(Duration::from_secs(
217 DEFAULT_REQUEUE_SECONDS,
218 )))
219}
220
221async fn verify_cluster_ref(
223 client: &Client,
224 namespace: &str,
225 cluster_ref: &ClusterReference,
226) -> Result<()> {
227 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230 match clusters.get(&cluster_ref.name).await {
231 Ok(_) => Ok(()),
232 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233 format!("{}/{}", cluster_ns, cluster_ref.name),
234 )),
235 Err(e) => Err(OperatorError::from(e)),
236 }
237}
238
239fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241 let name = connect.name_any();
242 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244 let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247 let mut labels = BTreeMap::new();
248 labels.insert(
249 "app.kubernetes.io/name".to_string(),
250 "rivven-connect".to_string(),
251 );
252 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253 labels.insert(
254 "app.kubernetes.io/component".to_string(),
255 "connect".to_string(),
256 );
257 labels.insert(
258 "app.kubernetes.io/managed-by".to_string(),
259 "rivven-operator".to_string(),
260 );
261
262 let mut data = BTreeMap::new();
263 data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265 Ok(ConfigMap {
266 metadata: ObjectMeta {
267 name: Some(format!("rivven-connect-{}", name)),
268 namespace: Some(namespace),
269 labels: Some(labels),
270 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
271 ..Default::default()
272 },
273 data: Some(data),
274 ..Default::default()
275 })
276}
277
278fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
280 use std::fmt::Write;
281
282 let mut yaml = String::new();
283
284 writeln!(yaml, "version: \"1.0\"").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
285 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
286
287 writeln!(yaml, "broker:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
289 writeln!(yaml, " bootstrap_servers:")
290 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
291
292 let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
294 let cluster_svc = format!(
295 "rivven-{}-client.{}.svc.cluster.local:9092",
296 spec.cluster_ref.name, cluster_ns
297 );
298 writeln!(yaml, " - {}", cluster_svc)
299 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
300 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
301
302 if !spec.sources.is_empty() || !spec.sinks.is_empty() {
304 writeln!(yaml, "settings:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
305 writeln!(yaml, " topic:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
306 writeln!(yaml, " auto_create: {}", spec.settings.topic.auto_create)
307 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
308 writeln!(
309 yaml,
310 " default_partitions: {}",
311 spec.settings.topic.default_partitions
312 )
313 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
314 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
315 }
316
317 if !spec.sources.is_empty() {
319 writeln!(yaml, "sources:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
320 for source in &spec.sources {
321 writeln!(yaml, " {}:", source.name)
322 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
323 writeln!(yaml, " connector: {}", source.connector)
324 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
325 writeln!(yaml, " topic: {}", source.topic)
326 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
327 writeln!(yaml, " enabled: {}", source.enabled)
328 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
329
330 let mut config = source.config.clone();
333 if let Some(ref routing) = source.topic_routing {
334 if let serde_json::Value::Object(ref mut map) = config {
335 map.insert("topic_routing".to_string(), serde_json::json!(routing));
336 } else if config.is_null() {
337 config = serde_json::json!({"topic_routing": routing});
338 }
339 }
340
341 if !config.is_null() {
342 writeln!(yaml, " config:")
343 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
344 let config_str = serde_json::to_string_pretty(&config)
346 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
347 for line in config_str.lines() {
348 writeln!(yaml, " {}", line)
349 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
350 }
351 }
352 }
353 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
354 }
355
356 if !spec.sinks.is_empty() {
358 writeln!(yaml, "sinks:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
359 for sink in &spec.sinks {
360 writeln!(yaml, " {}:", sink.name)
361 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
362 writeln!(yaml, " connector: {}", sink.connector)
363 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
364 writeln!(yaml, " topics: {:?}", sink.topics)
365 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
366 writeln!(yaml, " consumer_group: {}", sink.consumer_group)
367 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
368 writeln!(yaml, " enabled: {}", sink.enabled)
369 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
370
371 if !sink.config.is_null() {
373 writeln!(yaml, " config:")
374 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
375 let config_str = serde_json::to_string_pretty(&sink.config)
376 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
377 for line in config_str.lines() {
378 writeln!(yaml, " {}", line)
379 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
380 }
381 }
382 }
383 }
384
385 Ok(yaml)
386}
387
388fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
390 let name = connect.name_any();
391 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
392 let spec = &connect.spec;
393
394 let mut labels = BTreeMap::new();
395 labels.insert(
396 "app.kubernetes.io/name".to_string(),
397 "rivven-connect".to_string(),
398 );
399 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
400 labels.insert(
401 "app.kubernetes.io/component".to_string(),
402 "connect".to_string(),
403 );
404 labels.insert(
405 "app.kubernetes.io/managed-by".to_string(),
406 "rivven-operator".to_string(),
407 );
408
409 for (k, v) in &spec.pod_labels {
411 if !k.starts_with("app.kubernetes.io/") {
412 labels.insert(k.clone(), v.clone());
413 }
414 }
415
416 let image = spec
417 .image
418 .clone()
419 .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
420
421 let container = k8s_openapi::api::core::v1::Container {
423 name: "connect".to_string(),
424 image: Some(image),
425 image_pull_policy: Some(spec.image_pull_policy.clone()),
426 args: Some(vec![
427 "run".to_string(),
428 "--config".to_string(),
429 "/config/pipeline.yaml".to_string(),
430 ]),
431 env: Some(spec.env.clone()),
432 volume_mounts: Some(vec![
433 k8s_openapi::api::core::v1::VolumeMount {
434 name: "config".to_string(),
435 mount_path: "/config".to_string(),
436 read_only: Some(true),
437 ..Default::default()
438 },
439 k8s_openapi::api::core::v1::VolumeMount {
440 name: "data".to_string(),
441 mount_path: "/data".to_string(),
442 ..Default::default()
443 },
444 ]),
445 resources: spec
446 .resources
447 .as_ref()
448 .and_then(|r| serde_json::from_value(r.clone()).ok()),
449 security_context: spec
450 .container_security_context
451 .as_ref()
452 .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
453 liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
454 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
455 path: Some("/health".to_string()),
456 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
457 ..Default::default()
458 }),
459 initial_delay_seconds: Some(30),
460 period_seconds: Some(10),
461 ..Default::default()
462 }),
463 readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
464 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
465 path: Some("/ready".to_string()),
466 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
467 ..Default::default()
468 }),
469 initial_delay_seconds: Some(10),
470 period_seconds: Some(5),
471 ..Default::default()
472 }),
473 ports: Some(vec![
474 k8s_openapi::api::core::v1::ContainerPort {
475 name: Some("http".to_string()),
476 container_port: 8080,
477 ..Default::default()
478 },
479 k8s_openapi::api::core::v1::ContainerPort {
480 name: Some("metrics".to_string()),
481 container_port: 9090,
482 ..Default::default()
483 },
484 ]),
485 ..Default::default()
486 };
487
488 let volumes = vec![
490 k8s_openapi::api::core::v1::Volume {
491 name: "config".to_string(),
492 config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
493 name: format!("rivven-connect-{}", name),
494 ..Default::default()
495 }),
496 ..Default::default()
497 },
498 k8s_openapi::api::core::v1::Volume {
499 name: "data".to_string(),
500 empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
501 ..Default::default()
502 },
503 ];
504
505 let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
507 None
508 } else {
509 Some(
510 spec.image_pull_secrets
511 .iter()
512 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
513 .collect(),
514 )
515 };
516
517 let pod_spec = k8s_openapi::api::core::v1::PodSpec {
518 containers: vec![container],
519 volumes: Some(volumes),
520 image_pull_secrets,
521 service_account_name: spec.service_account.clone(),
522 node_selector: if spec.node_selector.is_empty() {
523 None
524 } else {
525 Some(spec.node_selector.clone())
526 },
527 tolerations: if spec.tolerations.is_empty() {
528 None
529 } else {
530 Some(spec.tolerations.clone())
531 },
532 affinity: spec
533 .affinity
534 .as_ref()
535 .and_then(|a| serde_json::from_value(a.clone()).ok()),
536 security_context: spec
537 .security_context
538 .as_ref()
539 .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
540 ..Default::default()
541 };
542
543 Ok(Deployment {
544 metadata: ObjectMeta {
545 name: Some(format!("rivven-connect-{}", name)),
546 namespace: Some(namespace),
547 labels: Some(labels.clone()),
548 annotations: Some(spec.pod_annotations.clone()),
549 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
550 ..Default::default()
551 },
552 spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
553 replicas: Some(spec.replicas),
554 selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
555 match_labels: Some(labels.clone()),
556 ..Default::default()
557 },
558 template: k8s_openapi::api::core::v1::PodTemplateSpec {
559 metadata: Some(ObjectMeta {
560 labels: Some(labels),
561 annotations: Some(spec.pod_annotations.clone()),
562 ..Default::default()
563 }),
564 spec: Some(pod_spec),
565 },
566 ..Default::default()
567 }),
568 ..Default::default()
569 })
570}
571
572fn build_connect_service(connect: &RivvenConnect) -> Service {
574 let name = connect.name_any();
575 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
576
577 let mut labels = BTreeMap::new();
578 labels.insert(
579 "app.kubernetes.io/name".to_string(),
580 "rivven-connect".to_string(),
581 );
582 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
583 labels.insert(
584 "app.kubernetes.io/component".to_string(),
585 "connect".to_string(),
586 );
587 labels.insert(
588 "app.kubernetes.io/managed-by".to_string(),
589 "rivven-operator".to_string(),
590 );
591
592 Service {
593 metadata: ObjectMeta {
594 name: Some(format!("rivven-connect-{}", name)),
595 namespace: Some(namespace),
596 labels: Some(labels.clone()),
597 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
598 ..Default::default()
599 },
600 spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
601 selector: Some(labels),
602 ports: Some(vec![
603 k8s_openapi::api::core::v1::ServicePort {
604 name: Some("http".to_string()),
605 port: 8080,
606 target_port: Some(
607 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
608 ),
609 ..Default::default()
610 },
611 k8s_openapi::api::core::v1::ServicePort {
612 name: Some("metrics".to_string()),
613 port: 9090,
614 target_port: Some(
615 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
616 ),
617 ..Default::default()
618 },
619 ]),
620 ..Default::default()
621 }),
622 ..Default::default()
623 }
624}
625
626async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
628 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
629 let name = cm.metadata.name.as_ref().unwrap();
630
631 debug!(name = %name, "Applying Connect ConfigMap");
632
633 let patch_params = PatchParams::apply("rivven-operator").force();
634 api.patch(name, &patch_params, &Patch::Apply(&cm))
635 .await
636 .map_err(OperatorError::from)?;
637
638 Ok(())
639}
640
641async fn apply_connect_deployment(
643 client: &Client,
644 namespace: &str,
645 deployment: Deployment,
646) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
647 let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
648 let name = deployment.metadata.name.as_ref().unwrap();
649
650 debug!(name = %name, "Applying Connect Deployment");
651
652 let patch_params = PatchParams::apply("rivven-operator").force();
653 let result = api
654 .patch(name, &patch_params, &Patch::Apply(&deployment))
655 .await
656 .map_err(OperatorError::from)?;
657
658 Ok(result.status)
659}
660
661async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
663 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
664 let name = svc.metadata.name.as_ref().unwrap();
665
666 debug!(name = %name, "Applying Connect Service");
667
668 let patch_params = PatchParams::apply("rivven-operator").force();
669 api.patch(name, &patch_params, &Patch::Apply(&svc))
670 .await
671 .map_err(OperatorError::from)?;
672
673 Ok(())
674}
675
676fn build_connect_status(
678 connect: &RivvenConnect,
679 deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
680) -> RivvenConnectStatus {
681 let now = Utc::now().to_rfc3339();
682 let spec = &connect.spec;
683
684 let (replicas, ready_replicas, updated_replicas) = deploy_status
685 .map(|s| {
686 (
687 s.replicas.unwrap_or(0),
688 s.ready_replicas.unwrap_or(0),
689 s.updated_replicas.unwrap_or(0),
690 )
691 })
692 .unwrap_or((0, 0, 0));
693
694 let desired_replicas = spec.replicas;
695
696 let phase = if ready_replicas == 0 {
698 ConnectPhase::Pending
699 } else if ready_replicas < desired_replicas {
700 if updated_replicas < desired_replicas {
701 ConnectPhase::Starting
702 } else {
703 ConnectPhase::Degraded
704 }
705 } else {
706 ConnectPhase::Running
707 };
708
709 let sources_total = spec.sources.len() as i32;
711 let sinks_total = spec.sinks.len() as i32;
712
713 let sources_running = if phase == ConnectPhase::Running {
715 spec.sources.iter().filter(|s| s.enabled).count() as i32
716 } else {
717 0
718 };
719 let sinks_running = if phase == ConnectPhase::Running {
720 spec.sinks.iter().filter(|s| s.enabled).count() as i32
721 } else {
722 0
723 };
724
725 let mut conditions = vec![];
727
728 conditions.push(ConnectCondition {
729 condition_type: "Ready".to_string(),
730 status: if ready_replicas >= desired_replicas {
731 "True".to_string()
732 } else {
733 "False".to_string()
734 },
735 reason: Some(format!(
736 "{}/{} replicas ready",
737 ready_replicas, desired_replicas
738 )),
739 message: None,
740 last_transition_time: Some(now.clone()),
741 });
742
743 conditions.push(ConnectCondition {
744 condition_type: "BrokerConnected".to_string(),
745 status: if phase == ConnectPhase::Running {
746 "True".to_string()
747 } else {
748 "Unknown".to_string()
749 },
750 reason: Some("ClusterRefValid".to_string()),
751 message: None,
752 last_transition_time: Some(now.clone()),
753 });
754
755 conditions.push(ConnectCondition {
756 condition_type: "SourcesHealthy".to_string(),
757 status: if sources_running == sources_total && sources_total > 0 {
758 "True".to_string()
759 } else if sources_running > 0 {
760 "Partial".to_string()
761 } else if sources_total == 0 {
762 "N/A".to_string()
763 } else {
764 "False".to_string()
765 },
766 reason: Some(format!(
767 "{}/{} sources running",
768 sources_running, sources_total
769 )),
770 message: None,
771 last_transition_time: Some(now.clone()),
772 });
773
774 conditions.push(ConnectCondition {
775 condition_type: "SinksHealthy".to_string(),
776 status: if sinks_running == sinks_total && sinks_total > 0 {
777 "True".to_string()
778 } else if sinks_running > 0 {
779 "Partial".to_string()
780 } else if sinks_total == 0 {
781 "N/A".to_string()
782 } else {
783 "False".to_string()
784 },
785 reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
786 message: None,
787 last_transition_time: Some(now.clone()),
788 });
789
790 let mut connector_statuses = Vec::new();
792
793 for source in &spec.sources {
794 connector_statuses.push(ConnectorStatus {
795 name: source.name.clone(),
796 connector_type: "source".to_string(),
797 kind: source.connector.clone(),
798 state: if source.enabled && phase == ConnectPhase::Running {
799 "running".to_string()
800 } else if !source.enabled {
801 "disabled".to_string()
802 } else {
803 "pending".to_string()
804 },
805 events_processed: 0, last_error: None,
807 last_success_time: None,
808 });
809 }
810
811 for sink in &spec.sinks {
812 connector_statuses.push(ConnectorStatus {
813 name: sink.name.clone(),
814 connector_type: "sink".to_string(),
815 kind: sink.connector.clone(),
816 state: if sink.enabled && phase == ConnectPhase::Running {
817 "running".to_string()
818 } else if !sink.enabled {
819 "disabled".to_string()
820 } else {
821 "pending".to_string()
822 },
823 events_processed: 0,
824 last_error: None,
825 last_success_time: None,
826 });
827 }
828
829 RivvenConnectStatus {
830 phase,
831 replicas,
832 ready_replicas,
833 sources_running,
834 sinks_running,
835 sources_total,
836 sinks_total,
837 observed_generation: connect.metadata.generation.unwrap_or(0),
838 conditions,
839 connector_statuses,
840 last_updated: Some(now),
841 message: None,
842 }
843}
844
845fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
847 let now = Utc::now().to_rfc3339();
848
849 RivvenConnectStatus {
850 phase: ConnectPhase::Failed,
851 replicas: 0,
852 ready_replicas: 0,
853 sources_running: 0,
854 sinks_running: 0,
855 sources_total: connect.spec.sources.len() as i32,
856 sinks_total: connect.spec.sinks.len() as i32,
857 observed_generation: connect.metadata.generation.unwrap_or(0),
858 conditions: vec![ConnectCondition {
859 condition_type: "Ready".to_string(),
860 status: "False".to_string(),
861 reason: Some("ValidationFailed".to_string()),
862 message: Some(error_msg.to_string()),
863 last_transition_time: Some(now.clone()),
864 }],
865 connector_statuses: vec![],
866 last_updated: Some(now),
867 message: Some(error_msg.to_string()),
868 }
869}
870
871async fn update_connect_status(
873 client: &Client,
874 namespace: &str,
875 name: &str,
876 status: RivvenConnectStatus,
877) -> Result<()> {
878 let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
879
880 debug!(name = %name, phase = ?status.phase, "Updating connect status");
881
882 let patch = serde_json::json!({
883 "status": status
884 });
885
886 let patch_params = PatchParams::default();
887 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
888 .await
889 .map_err(OperatorError::from)?;
890
891 Ok(())
892}
893
894#[instrument(skip(connect, _ctx))]
896async fn cleanup_connect(
897 connect: Arc<RivvenConnect>,
898 _ctx: Arc<ConnectControllerContext>,
899) -> Result<Action> {
900 let name = connect.name_any();
901 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
902
903 info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
904
905 info!(name = %name, "Connect cleanup complete");
912
913 Ok(Action::await_change())
914}
915
916fn connect_error_policy(
918 _connect: Arc<RivvenConnect>,
919 error: &OperatorError,
920 _ctx: Arc<ConnectControllerContext>,
921) -> Action {
922 warn!(
923 error = %error,
924 "Connect reconciliation error, will retry"
925 );
926
927 let delay = error
928 .requeue_delay()
929 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
930
931 Action::requeue(delay)
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937 use crate::crd::{
938 ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
939 SourceConnectorSpec, SourceTopicConfigSpec,
940 };
941
942 fn create_test_connect() -> RivvenConnect {
943 RivvenConnect {
944 metadata: ObjectMeta {
945 name: Some("test-connect".to_string()),
946 namespace: Some("default".to_string()),
947 uid: Some("test-uid".to_string()),
948 generation: Some(1),
949 ..Default::default()
950 },
951 spec: RivvenConnectSpec {
952 cluster_ref: ClusterReference {
953 name: "test-cluster".to_string(),
954 namespace: None,
955 },
956 replicas: 2,
957 version: "0.0.1".to_string(),
958 image: None,
959 image_pull_policy: "IfNotPresent".to_string(),
960 image_pull_secrets: vec![],
961 resources: None,
962 config: ConnectConfigSpec::default(),
963 sources: vec![SourceConnectorSpec {
964 name: "test-source".to_string(),
965 connector: "datagen".to_string(),
966 topic: "test-topic".to_string(),
967 topic_routing: None,
968 enabled: true,
969 config: serde_json::Value::Null,
970 config_secret_ref: None,
971 topic_config: SourceTopicConfigSpec::default(),
972 }],
973 sinks: vec![SinkConnectorSpec {
974 name: "test-sink".to_string(),
975 connector: "stdout".to_string(),
976 topics: vec!["test-topic".to_string()],
977 consumer_group: "test-group".to_string(),
978 enabled: true,
979 start_offset: "latest".to_string(),
980 config: serde_json::Value::Null,
981 config_secret_ref: None,
982 rate_limit: RateLimitSpec::default(),
983 }],
984 settings: GlobalConnectSettings::default(),
985 tls: ConnectTlsSpec::default(),
986 pod_annotations: BTreeMap::new(),
987 pod_labels: BTreeMap::new(),
988 env: vec![],
989 node_selector: BTreeMap::new(),
990 tolerations: vec![],
991 affinity: None,
992 service_account: None,
993 security_context: None,
994 container_security_context: None,
995 },
996 status: None,
997 }
998 }
999
1000 #[test]
1001 fn test_build_connect_status_pending() {
1002 let connect = create_test_connect();
1003 let status = build_connect_status(&connect, None);
1004
1005 assert_eq!(status.phase, ConnectPhase::Pending);
1006 assert_eq!(status.replicas, 0);
1007 assert_eq!(status.ready_replicas, 0);
1008 assert_eq!(status.sources_total, 1);
1009 assert_eq!(status.sinks_total, 1);
1010 }
1011
1012 #[test]
1013 fn test_build_connect_status_running() {
1014 let connect = create_test_connect();
1015 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1016 replicas: Some(2),
1017 ready_replicas: Some(2),
1018 updated_replicas: Some(2),
1019 ..Default::default()
1020 };
1021
1022 let status = build_connect_status(&connect, Some(deploy_status));
1023
1024 assert_eq!(status.phase, ConnectPhase::Running);
1025 assert_eq!(status.replicas, 2);
1026 assert_eq!(status.ready_replicas, 2);
1027 assert_eq!(status.sources_running, 1);
1028 assert_eq!(status.sinks_running, 1);
1029 }
1030
1031 #[test]
1032 fn test_build_connect_status_degraded() {
1033 let connect = create_test_connect();
1034 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1035 replicas: Some(2),
1036 ready_replicas: Some(1),
1037 updated_replicas: Some(2),
1038 ..Default::default()
1039 };
1040
1041 let status = build_connect_status(&connect, Some(deploy_status));
1042
1043 assert_eq!(status.phase, ConnectPhase::Degraded);
1044 }
1045
1046 #[test]
1047 fn test_build_failed_status() {
1048 let connect = create_test_connect();
1049 let status = build_failed_status(&connect, "Test error");
1050
1051 assert_eq!(status.phase, ConnectPhase::Failed);
1052 assert_eq!(status.message, Some("Test error".to_string()));
1053 assert_eq!(status.conditions.len(), 1);
1054 assert_eq!(status.conditions[0].status, "False");
1055 }
1056
1057 #[test]
1058 fn test_build_pipeline_yaml() {
1059 let connect = create_test_connect();
1060 let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1061
1062 assert!(yaml.contains("version: \"1.0\""));
1063 assert!(yaml.contains("bootstrap_servers:"));
1064 assert!(yaml.contains("rivven-test-cluster-client"));
1065 assert!(yaml.contains("sources:"));
1066 assert!(yaml.contains("test-source:"));
1067 assert!(yaml.contains("sinks:"));
1068 assert!(yaml.contains("test-sink:"));
1069 }
1070
1071 #[test]
1072 fn test_build_connect_configmap() {
1073 let connect = create_test_connect();
1074 let cm = build_connect_configmap(&connect).unwrap();
1075
1076 assert_eq!(
1077 cm.metadata.name,
1078 Some("rivven-connect-test-connect".to_string())
1079 );
1080 assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1081 }
1082
1083 #[test]
1084 fn test_build_connect_deployment() {
1085 let connect = create_test_connect();
1086 let deployment = build_connect_deployment(&connect).unwrap();
1087
1088 assert_eq!(
1089 deployment.metadata.name,
1090 Some("rivven-connect-test-connect".to_string())
1091 );
1092 assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1093 }
1094
1095 #[test]
1096 fn test_build_connect_service() {
1097 let connect = create_test_connect();
1098 let service = build_connect_service(&connect);
1099
1100 assert_eq!(
1101 service.metadata.name,
1102 Some("rivven-connect-test-connect".to_string())
1103 );
1104 let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1105 assert_eq!(ports.len(), 2);
1106 }
1107
1108 #[test]
1109 fn test_connector_statuses() {
1110 let connect = create_test_connect();
1111 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1112 replicas: Some(2),
1113 ready_replicas: Some(2),
1114 updated_replicas: Some(2),
1115 ..Default::default()
1116 };
1117
1118 let status = build_connect_status(&connect, Some(deploy_status));
1119
1120 assert_eq!(status.connector_statuses.len(), 2);
1121
1122 let source_status = status
1123 .connector_statuses
1124 .iter()
1125 .find(|s| s.name == "test-source")
1126 .unwrap();
1127 assert_eq!(source_status.connector_type, "source");
1128 assert_eq!(source_status.state, "running");
1129
1130 let sink_status = status
1131 .connector_statuses
1132 .iter()
1133 .find(|s| s.name == "test-sink")
1134 .unwrap();
1135 assert_eq!(sink_status.connector_type, "sink");
1136 assert_eq!(sink_status.state, "running");
1137 }
1138
1139 #[test]
1140 fn test_conditions() {
1141 let connect = create_test_connect();
1142 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1143 replicas: Some(2),
1144 ready_replicas: Some(2),
1145 updated_replicas: Some(2),
1146 ..Default::default()
1147 };
1148
1149 let status = build_connect_status(&connect, Some(deploy_status));
1150
1151 assert_eq!(status.conditions.len(), 4);
1152
1153 let ready_cond = status
1154 .conditions
1155 .iter()
1156 .find(|c| c.condition_type == "Ready")
1157 .unwrap();
1158 assert_eq!(ready_cond.status, "True");
1159
1160 let broker_cond = status
1161 .conditions
1162 .iter()
1163 .find(|c| c.condition_type == "BrokerConnected")
1164 .unwrap();
1165 assert_eq!(broker_cond.status, "True");
1166 }
1167}