1use crate::crd::{
7 ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8 RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27pub const CONNECT_FINALIZER: &str = "rivven.io/connect-finalizer";
29
30const DEFAULT_REQUEUE_SECONDS: u64 = 60; const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36pub struct ConnectControllerContext {
38 pub client: Client,
40 pub metrics: Option<ConnectControllerMetrics>,
42}
43
44#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47 pub reconciliations: metrics::Counter,
49 pub errors: metrics::Counter,
51 pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56 pub fn new() -> Self {
58 Self {
59 reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60 errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61 duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62 }
63 }
64}
65
66impl Default for ConnectControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let connects: Api<RivvenConnect> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(ConnectControllerContext {
80 client: client.clone(),
81 metrics: Some(ConnectControllerMetrics::new()),
82 });
83
84 info!(
85 namespace = namespace.as_deref().unwrap_or("all"),
86 "Starting RivvenConnect controller"
87 );
88
89 let deployments = match &namespace {
91 Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92 None => Api::<Deployment>::all(client.clone()),
93 };
94
95 let configmaps = match &namespace {
96 Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97 None => Api::<ConfigMap>::all(client.clone()),
98 };
99
100 Controller::new(connects.clone(), Config::default())
101 .owns(deployments, Config::default())
102 .owns(configmaps, Config::default())
103 .run(reconcile_connect, connect_error_policy, ctx)
104 .for_each(|result| async move {
105 match result {
106 Ok((obj, action)) => {
107 debug!(
108 name = obj.name,
109 namespace = obj.namespace,
110 ?action,
111 "Connect reconciliation completed"
112 );
113 }
114 Err(e) => {
115 error!(error = %e, "Connect reconciliation failed");
116 }
117 }
118 })
119 .await;
120
121 Ok(())
122}
123
124#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127 connect: Arc<RivvenConnect>,
128 ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130 let start = std::time::Instant::now();
131
132 if let Some(ref metrics) = ctx.metrics {
133 metrics.reconciliations.increment(1);
134 }
135
136 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137 let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139 let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140 match event {
141 FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142 FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143 }
144 })
145 .await;
146
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.duration.record(start.elapsed().as_secs_f64());
149 }
150
151 result.map_err(|e| {
152 if let Some(ref metrics) = ctx.metrics {
153 metrics.errors.increment(1);
154 }
155 OperatorError::ReconcileFailed(e.to_string())
156 })
157}
158
159#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162 connect: Arc<RivvenConnect>,
163 ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165 let name = connect.name_any();
166 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168 info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170 if let Err(errors) = connect.spec.validate() {
172 let error_messages: Vec<String> = errors
173 .field_errors()
174 .iter()
175 .flat_map(|(field, errs)| {
176 errs.iter()
177 .map(move |e| format!("{}: {:?}", field, e.message))
178 })
179 .collect();
180 let error_msg = error_messages.join("; ");
181 warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183 update_connect_status(
185 &ctx.client,
186 &namespace,
187 &name,
188 build_failed_status(&connect, &error_msg),
189 )
190 .await?;
191
192 return Err(OperatorError::InvalidConfig(error_msg));
193 }
194
195 verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198 let configmap = build_connect_configmap(&connect)?;
200 apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202 let deployment = build_connect_deployment(&connect)?;
204 let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206 let service = build_connect_service(&connect);
208 apply_connect_service(&ctx.client, &namespace, service).await?;
209
210 let status = build_connect_status(&connect, deploy_status);
212 update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214 info!(name = %name, "Connect reconciliation complete");
215
216 Ok(Action::requeue(Duration::from_secs(
217 DEFAULT_REQUEUE_SECONDS,
218 )))
219}
220
221async fn verify_cluster_ref(
223 client: &Client,
224 namespace: &str,
225 cluster_ref: &ClusterReference,
226) -> Result<()> {
227 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230 match clusters.get(&cluster_ref.name).await {
231 Ok(_) => Ok(()),
232 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233 format!("{}/{}", cluster_ns, cluster_ref.name),
234 )),
235 Err(e) => Err(OperatorError::from(e)),
236 }
237}
238
239fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241 let name = connect.name_any();
242 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244 let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247 let mut labels = BTreeMap::new();
248 labels.insert(
249 "app.kubernetes.io/name".to_string(),
250 "rivven-connect".to_string(),
251 );
252 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253 labels.insert(
254 "app.kubernetes.io/component".to_string(),
255 "connect".to_string(),
256 );
257 labels.insert(
258 "app.kubernetes.io/managed-by".to_string(),
259 "rivven-operator".to_string(),
260 );
261
262 let mut data = BTreeMap::new();
263 data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265 Ok(ConfigMap {
266 metadata: ObjectMeta {
267 name: Some(format!("rivven-connect-{}", name)),
268 namespace: Some(namespace),
269 labels: Some(labels),
270 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
271 ..Default::default()
272 },
273 data: Some(data),
274 ..Default::default()
275 })
276}
277
278fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
280 use std::fmt::Write;
281
282 let mut yaml = String::new();
283
284 writeln!(yaml, "version: \"1.0\"").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
285 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
286
287 writeln!(yaml, "broker:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
289 writeln!(yaml, " bootstrap_servers:")
290 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
291
292 let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
294 let cluster_svc = format!(
295 "rivven-{}-client.{}.svc.cluster.local:9092",
296 spec.cluster_ref.name, cluster_ns
297 );
298 writeln!(yaml, " - {}", cluster_svc)
299 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
300 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
301
302 if !spec.sources.is_empty() || !spec.sinks.is_empty() {
304 writeln!(yaml, "settings:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
305 writeln!(yaml, " topic:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
306 writeln!(yaml, " auto_create: {}", spec.settings.topic.auto_create)
307 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
308 writeln!(
309 yaml,
310 " default_partitions: {}",
311 spec.settings.topic.default_partitions
312 )
313 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
314 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
315 }
316
317 if !spec.sources.is_empty() {
319 writeln!(yaml, "sources:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
320 for source in &spec.sources {
321 writeln!(yaml, " {}:", source.name)
322 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
323 writeln!(yaml, " connector: {}", source.connector)
324 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
325 writeln!(yaml, " topic: {}", source.topic)
326 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
327 writeln!(yaml, " enabled: {}", source.enabled)
328 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
329
330 if !source.config.is_null() {
332 writeln!(yaml, " config:")
333 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
334 let config_str = serde_json::to_string_pretty(&source.config)
336 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
337 for line in config_str.lines() {
338 writeln!(yaml, " {}", line)
339 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
340 }
341 }
342 }
343 writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
344 }
345
346 if !spec.sinks.is_empty() {
348 writeln!(yaml, "sinks:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
349 for sink in &spec.sinks {
350 writeln!(yaml, " {}:", sink.name)
351 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
352 writeln!(yaml, " connector: {}", sink.connector)
353 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
354 writeln!(yaml, " topics: {:?}", sink.topics)
355 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
356 writeln!(yaml, " consumer_group: {}", sink.consumer_group)
357 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
358 writeln!(yaml, " enabled: {}", sink.enabled)
359 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
360
361 if !sink.config.is_null() {
363 writeln!(yaml, " config:")
364 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
365 let config_str = serde_json::to_string_pretty(&sink.config)
366 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
367 for line in config_str.lines() {
368 writeln!(yaml, " {}", line)
369 .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
370 }
371 }
372 }
373 }
374
375 Ok(yaml)
376}
377
378fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
380 let name = connect.name_any();
381 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
382 let spec = &connect.spec;
383
384 let mut labels = BTreeMap::new();
385 labels.insert(
386 "app.kubernetes.io/name".to_string(),
387 "rivven-connect".to_string(),
388 );
389 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
390 labels.insert(
391 "app.kubernetes.io/component".to_string(),
392 "connect".to_string(),
393 );
394 labels.insert(
395 "app.kubernetes.io/managed-by".to_string(),
396 "rivven-operator".to_string(),
397 );
398
399 for (k, v) in &spec.pod_labels {
401 if !k.starts_with("app.kubernetes.io/") {
402 labels.insert(k.clone(), v.clone());
403 }
404 }
405
406 let image = spec
407 .image
408 .clone()
409 .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
410
411 let container = k8s_openapi::api::core::v1::Container {
413 name: "connect".to_string(),
414 image: Some(image),
415 image_pull_policy: Some(spec.image_pull_policy.clone()),
416 args: Some(vec![
417 "run".to_string(),
418 "--config".to_string(),
419 "/config/pipeline.yaml".to_string(),
420 ]),
421 env: Some(spec.env.clone()),
422 volume_mounts: Some(vec![
423 k8s_openapi::api::core::v1::VolumeMount {
424 name: "config".to_string(),
425 mount_path: "/config".to_string(),
426 read_only: Some(true),
427 ..Default::default()
428 },
429 k8s_openapi::api::core::v1::VolumeMount {
430 name: "data".to_string(),
431 mount_path: "/data".to_string(),
432 ..Default::default()
433 },
434 ]),
435 resources: spec
436 .resources
437 .as_ref()
438 .and_then(|r| serde_json::from_value(r.clone()).ok()),
439 security_context: spec
440 .container_security_context
441 .as_ref()
442 .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
443 liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
444 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
445 path: Some("/health".to_string()),
446 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
447 ..Default::default()
448 }),
449 initial_delay_seconds: Some(30),
450 period_seconds: Some(10),
451 ..Default::default()
452 }),
453 readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
454 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
455 path: Some("/ready".to_string()),
456 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
457 ..Default::default()
458 }),
459 initial_delay_seconds: Some(10),
460 period_seconds: Some(5),
461 ..Default::default()
462 }),
463 ports: Some(vec![
464 k8s_openapi::api::core::v1::ContainerPort {
465 name: Some("http".to_string()),
466 container_port: 8080,
467 ..Default::default()
468 },
469 k8s_openapi::api::core::v1::ContainerPort {
470 name: Some("metrics".to_string()),
471 container_port: 9090,
472 ..Default::default()
473 },
474 ]),
475 ..Default::default()
476 };
477
478 let volumes = vec![
480 k8s_openapi::api::core::v1::Volume {
481 name: "config".to_string(),
482 config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
483 name: format!("rivven-connect-{}", name),
484 ..Default::default()
485 }),
486 ..Default::default()
487 },
488 k8s_openapi::api::core::v1::Volume {
489 name: "data".to_string(),
490 empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
491 ..Default::default()
492 },
493 ];
494
495 let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
497 None
498 } else {
499 Some(
500 spec.image_pull_secrets
501 .iter()
502 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
503 .collect(),
504 )
505 };
506
507 let pod_spec = k8s_openapi::api::core::v1::PodSpec {
508 containers: vec![container],
509 volumes: Some(volumes),
510 image_pull_secrets,
511 service_account_name: spec.service_account.clone(),
512 node_selector: if spec.node_selector.is_empty() {
513 None
514 } else {
515 Some(spec.node_selector.clone())
516 },
517 tolerations: if spec.tolerations.is_empty() {
518 None
519 } else {
520 Some(spec.tolerations.clone())
521 },
522 affinity: spec
523 .affinity
524 .as_ref()
525 .and_then(|a| serde_json::from_value(a.clone()).ok()),
526 security_context: spec
527 .security_context
528 .as_ref()
529 .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
530 ..Default::default()
531 };
532
533 Ok(Deployment {
534 metadata: ObjectMeta {
535 name: Some(format!("rivven-connect-{}", name)),
536 namespace: Some(namespace),
537 labels: Some(labels.clone()),
538 annotations: Some(spec.pod_annotations.clone()),
539 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
540 ..Default::default()
541 },
542 spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
543 replicas: Some(spec.replicas),
544 selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
545 match_labels: Some(labels.clone()),
546 ..Default::default()
547 },
548 template: k8s_openapi::api::core::v1::PodTemplateSpec {
549 metadata: Some(ObjectMeta {
550 labels: Some(labels),
551 annotations: Some(spec.pod_annotations.clone()),
552 ..Default::default()
553 }),
554 spec: Some(pod_spec),
555 },
556 ..Default::default()
557 }),
558 ..Default::default()
559 })
560}
561
562fn build_connect_service(connect: &RivvenConnect) -> Service {
564 let name = connect.name_any();
565 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
566
567 let mut labels = BTreeMap::new();
568 labels.insert(
569 "app.kubernetes.io/name".to_string(),
570 "rivven-connect".to_string(),
571 );
572 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
573 labels.insert(
574 "app.kubernetes.io/component".to_string(),
575 "connect".to_string(),
576 );
577 labels.insert(
578 "app.kubernetes.io/managed-by".to_string(),
579 "rivven-operator".to_string(),
580 );
581
582 Service {
583 metadata: ObjectMeta {
584 name: Some(format!("rivven-connect-{}", name)),
585 namespace: Some(namespace),
586 labels: Some(labels.clone()),
587 owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
588 ..Default::default()
589 },
590 spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
591 selector: Some(labels),
592 ports: Some(vec![
593 k8s_openapi::api::core::v1::ServicePort {
594 name: Some("http".to_string()),
595 port: 8080,
596 target_port: Some(
597 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
598 ),
599 ..Default::default()
600 },
601 k8s_openapi::api::core::v1::ServicePort {
602 name: Some("metrics".to_string()),
603 port: 9090,
604 target_port: Some(
605 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
606 ),
607 ..Default::default()
608 },
609 ]),
610 ..Default::default()
611 }),
612 ..Default::default()
613 }
614}
615
616async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
618 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
619 let name = cm.metadata.name.as_ref().unwrap();
620
621 debug!(name = %name, "Applying Connect ConfigMap");
622
623 let patch_params = PatchParams::apply("rivven-operator").force();
624 api.patch(name, &patch_params, &Patch::Apply(&cm))
625 .await
626 .map_err(OperatorError::from)?;
627
628 Ok(())
629}
630
631async fn apply_connect_deployment(
633 client: &Client,
634 namespace: &str,
635 deployment: Deployment,
636) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
637 let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
638 let name = deployment.metadata.name.as_ref().unwrap();
639
640 debug!(name = %name, "Applying Connect Deployment");
641
642 let patch_params = PatchParams::apply("rivven-operator").force();
643 let result = api
644 .patch(name, &patch_params, &Patch::Apply(&deployment))
645 .await
646 .map_err(OperatorError::from)?;
647
648 Ok(result.status)
649}
650
651async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
653 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
654 let name = svc.metadata.name.as_ref().unwrap();
655
656 debug!(name = %name, "Applying Connect Service");
657
658 let patch_params = PatchParams::apply("rivven-operator").force();
659 api.patch(name, &patch_params, &Patch::Apply(&svc))
660 .await
661 .map_err(OperatorError::from)?;
662
663 Ok(())
664}
665
666fn build_connect_status(
668 connect: &RivvenConnect,
669 deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
670) -> RivvenConnectStatus {
671 let now = Utc::now().to_rfc3339();
672 let spec = &connect.spec;
673
674 let (replicas, ready_replicas, updated_replicas) = deploy_status
675 .map(|s| {
676 (
677 s.replicas.unwrap_or(0),
678 s.ready_replicas.unwrap_or(0),
679 s.updated_replicas.unwrap_or(0),
680 )
681 })
682 .unwrap_or((0, 0, 0));
683
684 let desired_replicas = spec.replicas;
685
686 let phase = if ready_replicas == 0 {
688 ConnectPhase::Pending
689 } else if ready_replicas < desired_replicas {
690 if updated_replicas < desired_replicas {
691 ConnectPhase::Starting
692 } else {
693 ConnectPhase::Degraded
694 }
695 } else {
696 ConnectPhase::Running
697 };
698
699 let sources_total = spec.sources.len() as i32;
701 let sinks_total = spec.sinks.len() as i32;
702
703 let sources_running = if phase == ConnectPhase::Running {
705 spec.sources.iter().filter(|s| s.enabled).count() as i32
706 } else {
707 0
708 };
709 let sinks_running = if phase == ConnectPhase::Running {
710 spec.sinks.iter().filter(|s| s.enabled).count() as i32
711 } else {
712 0
713 };
714
715 let mut conditions = vec![];
717
718 conditions.push(ConnectCondition {
719 condition_type: "Ready".to_string(),
720 status: if ready_replicas >= desired_replicas {
721 "True".to_string()
722 } else {
723 "False".to_string()
724 },
725 reason: Some(format!(
726 "{}/{} replicas ready",
727 ready_replicas, desired_replicas
728 )),
729 message: None,
730 last_transition_time: Some(now.clone()),
731 });
732
733 conditions.push(ConnectCondition {
734 condition_type: "BrokerConnected".to_string(),
735 status: if phase == ConnectPhase::Running {
736 "True".to_string()
737 } else {
738 "Unknown".to_string()
739 },
740 reason: Some("ClusterRefValid".to_string()),
741 message: None,
742 last_transition_time: Some(now.clone()),
743 });
744
745 conditions.push(ConnectCondition {
746 condition_type: "SourcesHealthy".to_string(),
747 status: if sources_running == sources_total && sources_total > 0 {
748 "True".to_string()
749 } else if sources_running > 0 {
750 "Partial".to_string()
751 } else if sources_total == 0 {
752 "N/A".to_string()
753 } else {
754 "False".to_string()
755 },
756 reason: Some(format!(
757 "{}/{} sources running",
758 sources_running, sources_total
759 )),
760 message: None,
761 last_transition_time: Some(now.clone()),
762 });
763
764 conditions.push(ConnectCondition {
765 condition_type: "SinksHealthy".to_string(),
766 status: if sinks_running == sinks_total && sinks_total > 0 {
767 "True".to_string()
768 } else if sinks_running > 0 {
769 "Partial".to_string()
770 } else if sinks_total == 0 {
771 "N/A".to_string()
772 } else {
773 "False".to_string()
774 },
775 reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
776 message: None,
777 last_transition_time: Some(now.clone()),
778 });
779
780 let mut connector_statuses = Vec::new();
782
783 for source in &spec.sources {
784 connector_statuses.push(ConnectorStatus {
785 name: source.name.clone(),
786 connector_type: "source".to_string(),
787 kind: source.connector.clone(),
788 state: if source.enabled && phase == ConnectPhase::Running {
789 "running".to_string()
790 } else if !source.enabled {
791 "disabled".to_string()
792 } else {
793 "pending".to_string()
794 },
795 events_processed: 0, last_error: None,
797 last_success_time: None,
798 });
799 }
800
801 for sink in &spec.sinks {
802 connector_statuses.push(ConnectorStatus {
803 name: sink.name.clone(),
804 connector_type: "sink".to_string(),
805 kind: sink.connector.clone(),
806 state: if sink.enabled && phase == ConnectPhase::Running {
807 "running".to_string()
808 } else if !sink.enabled {
809 "disabled".to_string()
810 } else {
811 "pending".to_string()
812 },
813 events_processed: 0,
814 last_error: None,
815 last_success_time: None,
816 });
817 }
818
819 RivvenConnectStatus {
820 phase,
821 replicas,
822 ready_replicas,
823 sources_running,
824 sinks_running,
825 sources_total,
826 sinks_total,
827 observed_generation: connect.metadata.generation.unwrap_or(0),
828 conditions,
829 connector_statuses,
830 last_updated: Some(now),
831 message: None,
832 }
833}
834
835fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
837 let now = Utc::now().to_rfc3339();
838
839 RivvenConnectStatus {
840 phase: ConnectPhase::Failed,
841 replicas: 0,
842 ready_replicas: 0,
843 sources_running: 0,
844 sinks_running: 0,
845 sources_total: connect.spec.sources.len() as i32,
846 sinks_total: connect.spec.sinks.len() as i32,
847 observed_generation: connect.metadata.generation.unwrap_or(0),
848 conditions: vec![ConnectCondition {
849 condition_type: "Ready".to_string(),
850 status: "False".to_string(),
851 reason: Some("ValidationFailed".to_string()),
852 message: Some(error_msg.to_string()),
853 last_transition_time: Some(now.clone()),
854 }],
855 connector_statuses: vec![],
856 last_updated: Some(now),
857 message: Some(error_msg.to_string()),
858 }
859}
860
861async fn update_connect_status(
863 client: &Client,
864 namespace: &str,
865 name: &str,
866 status: RivvenConnectStatus,
867) -> Result<()> {
868 let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
869
870 debug!(name = %name, phase = ?status.phase, "Updating connect status");
871
872 let patch = serde_json::json!({
873 "status": status
874 });
875
876 let patch_params = PatchParams::default();
877 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
878 .await
879 .map_err(OperatorError::from)?;
880
881 Ok(())
882}
883
884#[instrument(skip(connect, _ctx))]
886async fn cleanup_connect(
887 connect: Arc<RivvenConnect>,
888 _ctx: Arc<ConnectControllerContext>,
889) -> Result<Action> {
890 let name = connect.name_any();
891 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
892
893 info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
894
895 info!(name = %name, "Connect cleanup complete");
902
903 Ok(Action::await_change())
904}
905
906fn connect_error_policy(
908 _connect: Arc<RivvenConnect>,
909 error: &OperatorError,
910 _ctx: Arc<ConnectControllerContext>,
911) -> Action {
912 warn!(
913 error = %error,
914 "Connect reconciliation error, will retry"
915 );
916
917 let delay = error
918 .requeue_delay()
919 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
920
921 Action::requeue(delay)
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927 use crate::crd::{
928 ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
929 SourceConnectorSpec, SourceTopicConfigSpec,
930 };
931
932 fn create_test_connect() -> RivvenConnect {
933 RivvenConnect {
934 metadata: ObjectMeta {
935 name: Some("test-connect".to_string()),
936 namespace: Some("default".to_string()),
937 uid: Some("test-uid".to_string()),
938 generation: Some(1),
939 ..Default::default()
940 },
941 spec: RivvenConnectSpec {
942 cluster_ref: ClusterReference {
943 name: "test-cluster".to_string(),
944 namespace: None,
945 },
946 replicas: 2,
947 version: "0.0.1".to_string(),
948 image: None,
949 image_pull_policy: "IfNotPresent".to_string(),
950 image_pull_secrets: vec![],
951 resources: None,
952 config: ConnectConfigSpec::default(),
953 sources: vec![SourceConnectorSpec {
954 name: "test-source".to_string(),
955 connector: "datagen".to_string(),
956 topic: "test-topic".to_string(),
957 topic_routing: None,
958 enabled: true,
959 postgres_cdc: None,
960 mysql_cdc: None,
961 http: None,
962 datagen: None,
963 kafka: None,
964 mqtt: None,
965 sqs: None,
966 pubsub: None,
967 config: serde_json::Value::Null,
968 config_secret_ref: None,
969 topic_config: SourceTopicConfigSpec::default(),
970 }],
971 sinks: vec![SinkConnectorSpec {
972 name: "test-sink".to_string(),
973 connector: "stdout".to_string(),
974 topics: vec!["test-topic".to_string()],
975 consumer_group: "test-group".to_string(),
976 enabled: true,
977 start_offset: "latest".to_string(),
978 s3: None,
979 http: None,
980 stdout: None,
981 kafka: None,
982 gcs: None,
983 azure_blob: None,
984 snowflake: None,
985 bigquery: None,
986 redshift: None,
987 config: serde_json::Value::Null,
988 config_secret_ref: None,
989 rate_limit: RateLimitSpec::default(),
990 }],
991 settings: GlobalConnectSettings::default(),
992 tls: ConnectTlsSpec::default(),
993 pod_annotations: BTreeMap::new(),
994 pod_labels: BTreeMap::new(),
995 env: vec![],
996 node_selector: BTreeMap::new(),
997 tolerations: vec![],
998 affinity: None,
999 service_account: None,
1000 security_context: None,
1001 container_security_context: None,
1002 },
1003 status: None,
1004 }
1005 }
1006
1007 #[test]
1008 fn test_build_connect_status_pending() {
1009 let connect = create_test_connect();
1010 let status = build_connect_status(&connect, None);
1011
1012 assert_eq!(status.phase, ConnectPhase::Pending);
1013 assert_eq!(status.replicas, 0);
1014 assert_eq!(status.ready_replicas, 0);
1015 assert_eq!(status.sources_total, 1);
1016 assert_eq!(status.sinks_total, 1);
1017 }
1018
1019 #[test]
1020 fn test_build_connect_status_running() {
1021 let connect = create_test_connect();
1022 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1023 replicas: Some(2),
1024 ready_replicas: Some(2),
1025 updated_replicas: Some(2),
1026 ..Default::default()
1027 };
1028
1029 let status = build_connect_status(&connect, Some(deploy_status));
1030
1031 assert_eq!(status.phase, ConnectPhase::Running);
1032 assert_eq!(status.replicas, 2);
1033 assert_eq!(status.ready_replicas, 2);
1034 assert_eq!(status.sources_running, 1);
1035 assert_eq!(status.sinks_running, 1);
1036 }
1037
1038 #[test]
1039 fn test_build_connect_status_degraded() {
1040 let connect = create_test_connect();
1041 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1042 replicas: Some(2),
1043 ready_replicas: Some(1),
1044 updated_replicas: Some(2),
1045 ..Default::default()
1046 };
1047
1048 let status = build_connect_status(&connect, Some(deploy_status));
1049
1050 assert_eq!(status.phase, ConnectPhase::Degraded);
1051 }
1052
1053 #[test]
1054 fn test_build_failed_status() {
1055 let connect = create_test_connect();
1056 let status = build_failed_status(&connect, "Test error");
1057
1058 assert_eq!(status.phase, ConnectPhase::Failed);
1059 assert_eq!(status.message, Some("Test error".to_string()));
1060 assert_eq!(status.conditions.len(), 1);
1061 assert_eq!(status.conditions[0].status, "False");
1062 }
1063
1064 #[test]
1065 fn test_build_pipeline_yaml() {
1066 let connect = create_test_connect();
1067 let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1068
1069 assert!(yaml.contains("version: \"1.0\""));
1070 assert!(yaml.contains("bootstrap_servers:"));
1071 assert!(yaml.contains("rivven-test-cluster-client"));
1072 assert!(yaml.contains("sources:"));
1073 assert!(yaml.contains("test-source:"));
1074 assert!(yaml.contains("sinks:"));
1075 assert!(yaml.contains("test-sink:"));
1076 }
1077
1078 #[test]
1079 fn test_build_connect_configmap() {
1080 let connect = create_test_connect();
1081 let cm = build_connect_configmap(&connect).unwrap();
1082
1083 assert_eq!(
1084 cm.metadata.name,
1085 Some("rivven-connect-test-connect".to_string())
1086 );
1087 assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1088 }
1089
1090 #[test]
1091 fn test_build_connect_deployment() {
1092 let connect = create_test_connect();
1093 let deployment = build_connect_deployment(&connect).unwrap();
1094
1095 assert_eq!(
1096 deployment.metadata.name,
1097 Some("rivven-connect-test-connect".to_string())
1098 );
1099 assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1100 }
1101
1102 #[test]
1103 fn test_build_connect_service() {
1104 let connect = create_test_connect();
1105 let service = build_connect_service(&connect);
1106
1107 assert_eq!(
1108 service.metadata.name,
1109 Some("rivven-connect-test-connect".to_string())
1110 );
1111 let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1112 assert_eq!(ports.len(), 2);
1113 }
1114
1115 #[test]
1116 fn test_connector_statuses() {
1117 let connect = create_test_connect();
1118 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1119 replicas: Some(2),
1120 ready_replicas: Some(2),
1121 updated_replicas: Some(2),
1122 ..Default::default()
1123 };
1124
1125 let status = build_connect_status(&connect, Some(deploy_status));
1126
1127 assert_eq!(status.connector_statuses.len(), 2);
1128
1129 let source_status = status
1130 .connector_statuses
1131 .iter()
1132 .find(|s| s.name == "test-source")
1133 .unwrap();
1134 assert_eq!(source_status.connector_type, "source");
1135 assert_eq!(source_status.state, "running");
1136
1137 let sink_status = status
1138 .connector_statuses
1139 .iter()
1140 .find(|s| s.name == "test-sink")
1141 .unwrap();
1142 assert_eq!(sink_status.connector_type, "sink");
1143 assert_eq!(sink_status.state, "running");
1144 }
1145
1146 #[test]
1147 fn test_conditions() {
1148 let connect = create_test_connect();
1149 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1150 replicas: Some(2),
1151 ready_replicas: Some(2),
1152 updated_replicas: Some(2),
1153 ..Default::default()
1154 };
1155
1156 let status = build_connect_status(&connect, Some(deploy_status));
1157
1158 assert_eq!(status.conditions.len(), 4);
1159
1160 let ready_cond = status
1161 .conditions
1162 .iter()
1163 .find(|c| c.condition_type == "Ready")
1164 .unwrap();
1165 assert_eq!(ready_cond.status, "True");
1166
1167 let broker_cond = status
1168 .conditions
1169 .iter()
1170 .find(|c| c.condition_type == "BrokerConnected")
1171 .unwrap();
1172 assert_eq!(broker_cond.status, "True");
1173 }
1174}