1use crate::crd::{
7 ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8 RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27pub const CONNECT_FINALIZER: &str = "rivven.hupe1980.github.io/connect-finalizer";
29
30const DEFAULT_REQUEUE_SECONDS: u64 = 60; const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36pub struct ConnectControllerContext {
38 pub client: Client,
40 pub metrics: Option<ConnectControllerMetrics>,
42}
43
44#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47 pub reconciliations: metrics::Counter,
49 pub errors: metrics::Counter,
51 pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56 pub fn new() -> Self {
58 Self {
59 reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60 errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61 duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62 }
63 }
64}
65
66impl Default for ConnectControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let connects: Api<RivvenConnect> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(ConnectControllerContext {
80 client: client.clone(),
81 metrics: Some(ConnectControllerMetrics::new()),
82 });
83
84 info!(
85 namespace = namespace.as_deref().unwrap_or("all"),
86 "Starting RivvenConnect controller"
87 );
88
89 let deployments = match &namespace {
91 Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92 None => Api::<Deployment>::all(client.clone()),
93 };
94
95 let configmaps = match &namespace {
96 Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97 None => Api::<ConfigMap>::all(client.clone()),
98 };
99
100 Controller::new(connects.clone(), Config::default())
101 .owns(deployments, Config::default())
102 .owns(configmaps, Config::default())
103 .run(reconcile_connect, connect_error_policy, ctx)
104 .for_each(|result| async move {
105 match result {
106 Ok((obj, action)) => {
107 debug!(
108 name = obj.name,
109 namespace = obj.namespace,
110 ?action,
111 "Connect reconciliation completed"
112 );
113 }
114 Err(e) => {
115 error!(error = %e, "Connect reconciliation failed");
116 }
117 }
118 })
119 .await;
120
121 Ok(())
122}
123
124#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127 connect: Arc<RivvenConnect>,
128 ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130 let start = std::time::Instant::now();
131
132 if let Some(ref metrics) = ctx.metrics {
133 metrics.reconciliations.increment(1);
134 }
135
136 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137 let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139 let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140 match event {
141 FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142 FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143 }
144 })
145 .await;
146
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.duration.record(start.elapsed().as_secs_f64());
149 }
150
151 result.map_err(|e| {
152 if let Some(ref metrics) = ctx.metrics {
153 metrics.errors.increment(1);
154 }
155 OperatorError::ReconcileFailed(e.to_string())
156 })
157}
158
159#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162 connect: Arc<RivvenConnect>,
163 ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165 let name = connect.name_any();
166 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168 info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170 if let Err(errors) = connect.spec.validate() {
172 let error_messages: Vec<String> = errors
173 .field_errors()
174 .iter()
175 .flat_map(|(field, errs)| {
176 errs.iter()
177 .map(move |e| format!("{}: {:?}", field, e.message))
178 })
179 .collect();
180 let error_msg = error_messages.join("; ");
181 warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183 update_connect_status(
185 &ctx.client,
186 &namespace,
187 &name,
188 build_failed_status(&connect, &error_msg),
189 )
190 .await?;
191
192 return Err(OperatorError::InvalidConfig(error_msg));
193 }
194
195 verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198 let configmap = build_connect_configmap(&connect)?;
200 apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202 let deployment = build_connect_deployment(&connect)?;
204 let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206 let service = build_connect_service(&connect)?;
208 apply_connect_service(&ctx.client, &namespace, service).await?;
209
210 let status = build_connect_status(&connect, deploy_status);
212 update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214 info!(name = %name, "Connect reconciliation complete");
215
216 Ok(Action::requeue(Duration::from_secs(
217 DEFAULT_REQUEUE_SECONDS,
218 )))
219}
220
221async fn verify_cluster_ref(
223 client: &Client,
224 namespace: &str,
225 cluster_ref: &ClusterReference,
226) -> Result<()> {
227 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230 match clusters.get(&cluster_ref.name).await {
231 Ok(_) => Ok(()),
232 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233 format!("{}/{}", cluster_ns, cluster_ref.name),
234 )),
235 Err(e) => Err(OperatorError::from(e)),
236 }
237}
238
239fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241 let name = connect.name_any();
242 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244 let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247 let mut labels = BTreeMap::new();
248 labels.insert(
249 "app.kubernetes.io/name".to_string(),
250 "rivven-connect".to_string(),
251 );
252 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253 labels.insert(
254 "app.kubernetes.io/component".to_string(),
255 "connect".to_string(),
256 );
257 labels.insert(
258 "app.kubernetes.io/managed-by".to_string(),
259 "rivven-operator".to_string(),
260 );
261
262 let mut data = BTreeMap::new();
263 data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265 Ok(ConfigMap {
266 metadata: ObjectMeta {
267 name: Some(format!("rivven-connect-{}", name)),
268 namespace: Some(namespace),
269 labels: Some(labels),
270 owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
271 || {
272 OperatorError::InvalidConfig(
273 "Failed to generate owner reference for ConfigMap".into(),
274 )
275 },
276 )?]),
277 ..Default::default()
278 },
279 data: Some(data),
280 ..Default::default()
281 })
282}
283
284fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
291 let mut pipeline = serde_json::Map::new();
292 pipeline.insert("version".to_string(), serde_json::json!("1.0"));
293
294 let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
296 let cluster_svc = format!(
297 "rivven-{}.{}.svc.cluster.local:9092",
298 spec.cluster_ref.name, cluster_ns
299 );
300 pipeline.insert(
301 "broker".to_string(),
302 serde_json::json!({ "bootstrap_servers": [cluster_svc] }),
303 );
304
305 if !spec.sources.is_empty() || !spec.sinks.is_empty() {
307 pipeline.insert(
308 "settings".to_string(),
309 serde_json::json!({
310 "topic": {
311 "auto_create": spec.settings.topic.auto_create,
312 "default_partitions": spec.settings.topic.default_partitions
313 }
314 }),
315 );
316 }
317
318 if !spec.sources.is_empty() {
320 let mut sources = serde_json::Map::new();
321 for source in &spec.sources {
322 let mut entry = serde_json::Map::new();
323 entry.insert("connector".to_string(), serde_json::json!(source.connector));
324 entry.insert("topic".to_string(), serde_json::json!(source.topic));
325 entry.insert("enabled".to_string(), serde_json::json!(source.enabled));
326
327 let mut config = source.config.clone();
329 if let Some(ref routing) = source.topic_routing {
330 if let serde_json::Value::Object(ref mut map) = config {
331 map.insert("topic_routing".to_string(), serde_json::json!(routing));
332 } else if config.is_null() {
333 config = serde_json::json!({"topic_routing": routing});
334 }
335 }
336
337 if !config.is_null() {
338 entry.insert("config".to_string(), config);
339 }
340
341 sources.insert(source.name.clone(), serde_json::Value::Object(entry));
342 }
343 pipeline.insert("sources".to_string(), serde_json::Value::Object(sources));
344 }
345
346 if !spec.sinks.is_empty() {
348 let mut sinks = serde_json::Map::new();
349 for sink in &spec.sinks {
350 let mut entry = serde_json::Map::new();
351 entry.insert("connector".to_string(), serde_json::json!(sink.connector));
352 entry.insert("topics".to_string(), serde_json::json!(sink.topics));
353 entry.insert(
354 "consumer_group".to_string(),
355 serde_json::json!(sink.consumer_group),
356 );
357 entry.insert("enabled".to_string(), serde_json::json!(sink.enabled));
358
359 if !sink.config.is_null() {
360 entry.insert("config".to_string(), sink.config.clone());
361 }
362
363 sinks.insert(sink.name.clone(), serde_json::Value::Object(entry));
364 }
365 pipeline.insert("sinks".to_string(), serde_json::Value::Object(sinks));
366 }
367
368 serde_yaml::to_string(&serde_json::Value::Object(pipeline)).map_err(|e| {
369 OperatorError::InvalidConfig(format!("failed to serialize pipeline YAML: {e}"))
370 })
371}
372
373fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
375 let name = connect.name_any();
376 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
377 let spec = &connect.spec;
378
379 let selector_labels: BTreeMap<String, String> = [
381 (
382 "app.kubernetes.io/name".to_string(),
383 "rivven-connect".to_string(),
384 ),
385 ("app.kubernetes.io/instance".to_string(), name.clone()),
386 ]
387 .into();
388
389 let mut labels = BTreeMap::new();
390 labels.insert(
391 "app.kubernetes.io/name".to_string(),
392 "rivven-connect".to_string(),
393 );
394 labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
395 labels.insert(
396 "app.kubernetes.io/component".to_string(),
397 "connect".to_string(),
398 );
399 labels.insert(
400 "app.kubernetes.io/managed-by".to_string(),
401 "rivven-operator".to_string(),
402 );
403
404 for (k, v) in &spec.pod_labels {
406 if !k.starts_with("app.kubernetes.io/") {
407 labels.insert(k.clone(), v.clone());
408 }
409 }
410
411 let image = spec
412 .image
413 .clone()
414 .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
415
416 let container = k8s_openapi::api::core::v1::Container {
418 name: "connect".to_string(),
419 image: Some(image),
420 image_pull_policy: Some(spec.image_pull_policy.clone()),
421 args: Some(vec![
422 "run".to_string(),
423 "--config".to_string(),
424 "/config/pipeline.yaml".to_string(),
425 ]),
426 env: Some(spec.env.clone()),
427 volume_mounts: Some(vec![
428 k8s_openapi::api::core::v1::VolumeMount {
429 name: "config".to_string(),
430 mount_path: "/config".to_string(),
431 read_only: Some(true),
432 ..Default::default()
433 },
434 k8s_openapi::api::core::v1::VolumeMount {
435 name: "data".to_string(),
436 mount_path: "/data".to_string(),
437 ..Default::default()
438 },
439 ]),
440 resources: spec
441 .resources
442 .as_ref()
443 .map(|r| serde_json::from_value(r.clone()))
444 .transpose()
445 .map_err(|e| {
446 OperatorError::InvalidConfig(format!("invalid resources config: {}", e))
447 })?,
448 security_context: {
449 let custom = spec
450 .container_security_context
451 .as_ref()
452 .map(|sc| serde_json::from_value(sc.clone()))
453 .transpose()
454 .map_err(|e| {
455 OperatorError::InvalidConfig(format!(
456 "invalid container security context: {}",
457 e
458 ))
459 })?;
460 Some(
461 custom.unwrap_or_else(|| k8s_openapi::api::core::v1::SecurityContext {
462 allow_privilege_escalation: Some(false),
463 read_only_root_filesystem: Some(true),
464 run_as_non_root: Some(true),
465 run_as_user: Some(1000),
466 run_as_group: Some(1000),
467 capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
468 drop: Some(vec!["ALL".to_string()]),
469 ..Default::default()
470 }),
471 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
472 type_: "RuntimeDefault".to_string(),
473 ..Default::default()
474 }),
475 ..Default::default()
476 }),
477 )
478 },
479 liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
480 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
481 path: Some("/health".to_string()),
482 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
483 ..Default::default()
484 }),
485 initial_delay_seconds: Some(30),
486 period_seconds: Some(10),
487 ..Default::default()
488 }),
489 readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
490 http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
491 path: Some("/ready".to_string()),
492 port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
493 ..Default::default()
494 }),
495 initial_delay_seconds: Some(10),
496 period_seconds: Some(5),
497 ..Default::default()
498 }),
499 ports: Some(vec![
500 k8s_openapi::api::core::v1::ContainerPort {
501 name: Some("http".to_string()),
502 container_port: 8080,
503 ..Default::default()
504 },
505 k8s_openapi::api::core::v1::ContainerPort {
506 name: Some("metrics".to_string()),
507 container_port: 9090,
508 ..Default::default()
509 },
510 ]),
511 ..Default::default()
512 };
513
514 let volumes = vec![
516 k8s_openapi::api::core::v1::Volume {
517 name: "config".to_string(),
518 config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
519 name: format!("rivven-connect-{}", name),
520 ..Default::default()
521 }),
522 ..Default::default()
523 },
524 k8s_openapi::api::core::v1::Volume {
525 name: "data".to_string(),
526 empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
527 ..Default::default()
528 },
529 ];
530
531 let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
533 None
534 } else {
535 Some(
536 spec.image_pull_secrets
537 .iter()
538 .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
539 .collect(),
540 )
541 };
542
543 let pod_spec = k8s_openapi::api::core::v1::PodSpec {
544 containers: vec![container],
545 volumes: Some(volumes),
546 image_pull_secrets,
547 service_account_name: spec.service_account.clone(),
548 node_selector: if spec.node_selector.is_empty() {
549 None
550 } else {
551 Some(spec.node_selector.clone())
552 },
553 tolerations: if spec.tolerations.is_empty() {
554 None
555 } else {
556 Some(spec.tolerations.clone())
557 },
558 affinity: spec
559 .affinity
560 .as_ref()
561 .map(|a| serde_json::from_value(a.clone()))
562 .transpose()
563 .map_err(|e| OperatorError::InvalidConfig(format!("invalid affinity config: {}", e)))?,
564 security_context: {
565 let custom = spec
566 .security_context
567 .as_ref()
568 .map(|sc| serde_json::from_value(sc.clone()))
569 .transpose()
570 .map_err(|e| {
571 OperatorError::InvalidConfig(format!("invalid pod security context: {}", e))
572 })?;
573 Some(
574 custom.unwrap_or_else(|| k8s_openapi::api::core::v1::PodSecurityContext {
575 run_as_non_root: Some(true),
576 run_as_user: Some(1000),
577 run_as_group: Some(1000),
578 fs_group: Some(1000),
579 seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
580 type_: "RuntimeDefault".to_string(),
581 ..Default::default()
582 }),
583 ..Default::default()
584 }),
585 )
586 },
587 automount_service_account_token: Some(false),
588 ..Default::default()
589 };
590
591 Ok(Deployment {
592 metadata: ObjectMeta {
593 name: Some(format!("rivven-connect-{}", name)),
594 namespace: Some(namespace),
595 labels: Some(labels.clone()),
596 owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
597 || {
598 OperatorError::InvalidConfig(
599 "Failed to generate owner reference for Deployment".into(),
600 )
601 },
602 )?]),
603 ..Default::default()
604 },
605 spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
606 replicas: Some(spec.replicas),
607 selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
608 match_labels: Some(selector_labels),
609 ..Default::default()
610 },
611 template: k8s_openapi::api::core::v1::PodTemplateSpec {
612 metadata: Some(ObjectMeta {
613 labels: Some(labels),
614 annotations: Some(spec.pod_annotations.clone()),
615 ..Default::default()
616 }),
617 spec: Some(pod_spec),
618 },
619 ..Default::default()
620 }),
621 ..Default::default()
622 })
623}
624
625fn build_connect_service(connect: &RivvenConnect) -> Result<Service> {
627 let name = connect.name_any();
628 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
629
630 let selector_labels: BTreeMap<String, String> = [
632 (
633 "app.kubernetes.io/name".to_string(),
634 "rivven-connect".to_string(),
635 ),
636 ("app.kubernetes.io/instance".to_string(), name.clone()),
637 ]
638 .into();
639
640 let mut labels = selector_labels.clone();
641 labels.insert(
642 "app.kubernetes.io/component".to_string(),
643 "connect".to_string(),
644 );
645 labels.insert(
646 "app.kubernetes.io/managed-by".to_string(),
647 "rivven-operator".to_string(),
648 );
649
650 Ok(Service {
651 metadata: ObjectMeta {
652 name: Some(format!("rivven-connect-{}", name)),
653 namespace: Some(namespace),
654 labels: Some(labels),
655 owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
656 || {
657 OperatorError::InvalidConfig(
658 "Failed to generate owner reference for Service".into(),
659 )
660 },
661 )?]),
662 ..Default::default()
663 },
664 spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
665 selector: Some(selector_labels),
666 ports: Some(vec![
667 k8s_openapi::api::core::v1::ServicePort {
668 name: Some("http".to_string()),
669 port: 8080,
670 target_port: Some(
671 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
672 ),
673 ..Default::default()
674 },
675 k8s_openapi::api::core::v1::ServicePort {
676 name: Some("metrics".to_string()),
677 port: 9090,
678 target_port: Some(
679 k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
680 ),
681 ..Default::default()
682 },
683 ]),
684 ..Default::default()
685 }),
686 ..Default::default()
687 })
688}
689
690fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
696 let labels = existing.meta().labels.as_ref();
697 let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
698 match managed_by {
699 Some(manager) if manager != "rivven-operator" => {
700 let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
701 Err(OperatorError::InvalidConfig(format!(
702 "resource '{}' is managed by '{}', not rivven-operator; \
703 refusing to force-apply to avoid ownership conflict",
704 name, manager
705 )))
706 }
707 _ => Ok(()),
708 }
709}
710
711async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
713 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
714 let name =
715 cm.metadata.name.as_ref().ok_or_else(|| {
716 OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
717 })?;
718
719 debug!(name = %name, "Applying Connect ConfigMap");
720
721 if let Ok(existing) = api.get(name).await {
723 verify_ownership(&existing)?;
724 }
725
726 let patch_params = PatchParams::apply("rivven-operator").force();
727 api.patch(name, &patch_params, &Patch::Apply(&cm))
728 .await
729 .map_err(OperatorError::from)?;
730
731 Ok(())
732}
733
734async fn apply_connect_deployment(
736 client: &Client,
737 namespace: &str,
738 deployment: Deployment,
739) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
740 let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
741 let name =
742 deployment.metadata.name.as_ref().ok_or_else(|| {
743 OperatorError::InvalidConfig("Deployment missing metadata.name".into())
744 })?;
745
746 debug!(name = %name, "Applying Connect Deployment");
747
748 if let Ok(existing) = api.get(name).await {
750 verify_ownership(&existing)?;
751 }
752
753 let patch_params = PatchParams::apply("rivven-operator").force();
754 let result = api
755 .patch(name, &patch_params, &Patch::Apply(&deployment))
756 .await
757 .map_err(OperatorError::from)?;
758
759 Ok(result.status)
760}
761
762async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
764 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
765 let name = svc
766 .metadata
767 .name
768 .as_ref()
769 .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
770
771 debug!(name = %name, "Applying Connect Service");
772
773 if let Ok(existing) = api.get(name).await {
775 verify_ownership(&existing)?;
776 }
777
778 let patch_params = PatchParams::apply("rivven-operator").force();
779 api.patch(name, &patch_params, &Patch::Apply(&svc))
780 .await
781 .map_err(OperatorError::from)?;
782
783 Ok(())
784}
785
786fn build_connect_status(
788 connect: &RivvenConnect,
789 deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
790) -> RivvenConnectStatus {
791 let now = Utc::now().to_rfc3339();
792 let spec = &connect.spec;
793
794 let (replicas, ready_replicas, updated_replicas) = deploy_status
795 .map(|s| {
796 (
797 s.replicas.unwrap_or(0),
798 s.ready_replicas.unwrap_or(0),
799 s.updated_replicas.unwrap_or(0),
800 )
801 })
802 .unwrap_or((0, 0, 0));
803
804 let desired_replicas = spec.replicas;
805
806 let phase = if ready_replicas == 0 {
808 ConnectPhase::Pending
809 } else if ready_replicas < desired_replicas {
810 if updated_replicas < desired_replicas {
811 ConnectPhase::Starting
812 } else {
813 ConnectPhase::Degraded
814 }
815 } else {
816 ConnectPhase::Running
817 };
818
819 let sources_total = spec.sources.len() as i32;
821 let sinks_total = spec.sinks.len() as i32;
822
823 let sources_running = if phase == ConnectPhase::Running {
825 spec.sources.iter().filter(|s| s.enabled).count() as i32
826 } else {
827 0
828 };
829 let sinks_running = if phase == ConnectPhase::Running {
830 spec.sinks.iter().filter(|s| s.enabled).count() as i32
831 } else {
832 0
833 };
834
835 let mut conditions = vec![];
837
838 conditions.push(ConnectCondition {
839 condition_type: "Ready".to_string(),
840 status: if ready_replicas >= desired_replicas {
841 "True".to_string()
842 } else {
843 "False".to_string()
844 },
845 reason: Some(format!(
846 "{}/{} replicas ready",
847 ready_replicas, desired_replicas
848 )),
849 message: None,
850 last_transition_time: Some(now.clone()),
851 });
852
853 conditions.push(ConnectCondition {
854 condition_type: "BrokerConnected".to_string(),
855 status: if phase == ConnectPhase::Running {
856 "True".to_string()
857 } else {
858 "Unknown".to_string()
859 },
860 reason: Some("ClusterRefValid".to_string()),
861 message: None,
862 last_transition_time: Some(now.clone()),
863 });
864
865 conditions.push(ConnectCondition {
866 condition_type: "SourcesHealthy".to_string(),
867 status: if sources_running == sources_total && sources_total > 0 {
868 "True".to_string()
869 } else if sources_running > 0 {
870 "Partial".to_string()
871 } else if sources_total == 0 {
872 "N/A".to_string()
873 } else {
874 "False".to_string()
875 },
876 reason: Some(format!(
877 "{}/{} sources running",
878 sources_running, sources_total
879 )),
880 message: None,
881 last_transition_time: Some(now.clone()),
882 });
883
884 conditions.push(ConnectCondition {
885 condition_type: "SinksHealthy".to_string(),
886 status: if sinks_running == sinks_total && sinks_total > 0 {
887 "True".to_string()
888 } else if sinks_running > 0 {
889 "Partial".to_string()
890 } else if sinks_total == 0 {
891 "N/A".to_string()
892 } else {
893 "False".to_string()
894 },
895 reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
896 message: None,
897 last_transition_time: Some(now.clone()),
898 });
899
900 let mut connector_statuses = Vec::new();
902
903 for source in &spec.sources {
904 connector_statuses.push(ConnectorStatus {
905 name: source.name.clone(),
906 connector_type: "source".to_string(),
907 kind: source.connector.clone(),
908 state: if source.enabled && phase == ConnectPhase::Running {
909 "running".to_string()
910 } else if !source.enabled {
911 "disabled".to_string()
912 } else {
913 "pending".to_string()
914 },
915 status_source: "synthetic".to_string(),
916 last_probed: None,
917 events_processed: 0, last_error: None,
919 last_success_time: None,
920 });
921 }
922
923 for sink in &spec.sinks {
924 connector_statuses.push(ConnectorStatus {
925 name: sink.name.clone(),
926 connector_type: "sink".to_string(),
927 kind: sink.connector.clone(),
928 state: if sink.enabled && phase == ConnectPhase::Running {
929 "running".to_string()
930 } else if !sink.enabled {
931 "disabled".to_string()
932 } else {
933 "pending".to_string()
934 },
935 status_source: "synthetic".to_string(),
936 last_probed: None,
937 events_processed: 0,
938 last_error: None,
939 last_success_time: None,
940 });
941 }
942
943 RivvenConnectStatus {
944 phase,
945 replicas,
946 ready_replicas,
947 sources_running,
948 sinks_running,
949 sources_total,
950 sinks_total,
951 observed_generation: connect.metadata.generation.unwrap_or(0),
952 conditions,
953 connector_statuses,
954 last_updated: Some(now),
955 message: None,
956 }
957}
958
959fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
961 let now = Utc::now().to_rfc3339();
962
963 RivvenConnectStatus {
964 phase: ConnectPhase::Failed,
965 replicas: 0,
966 ready_replicas: 0,
967 sources_running: 0,
968 sinks_running: 0,
969 sources_total: connect.spec.sources.len() as i32,
970 sinks_total: connect.spec.sinks.len() as i32,
971 observed_generation: connect.metadata.generation.unwrap_or(0),
972 conditions: vec![ConnectCondition {
973 condition_type: "Ready".to_string(),
974 status: "False".to_string(),
975 reason: Some("ValidationFailed".to_string()),
976 message: Some(error_msg.to_string()),
977 last_transition_time: Some(now.clone()),
978 }],
979 connector_statuses: vec![],
980 last_updated: Some(now),
981 message: Some(error_msg.to_string()),
982 }
983}
984
985async fn update_connect_status(
987 client: &Client,
988 namespace: &str,
989 name: &str,
990 status: RivvenConnectStatus,
991) -> Result<()> {
992 let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
993
994 debug!(name = %name, phase = ?status.phase, "Updating connect status");
995
996 let patch = serde_json::json!({
997 "status": status
998 });
999
1000 let patch_params = PatchParams::default();
1001 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
1002 .await
1003 .map_err(OperatorError::from)?;
1004
1005 Ok(())
1006}
1007
1008#[instrument(skip(connect, ctx))]
1010async fn cleanup_connect(
1011 connect: Arc<RivvenConnect>,
1012 ctx: Arc<ConnectControllerContext>,
1013) -> Result<Action> {
1014 let name = connect.name_any();
1015 let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
1016
1017 info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
1018
1019 let has_sinks = !connect.spec.sinks.is_empty();
1022 if has_sinks {
1023 info!(name = %name, "Sink connectors detected — setting Terminating status for final offset commit");
1024 let draining_status = RivvenConnectStatus {
1025 phase: ConnectPhase::Terminating,
1026 replicas: 0,
1027 ready_replicas: 0,
1028 sources_running: 0,
1029 sinks_running: 0,
1030 sources_total: connect.spec.sources.len() as i32,
1031 sinks_total: connect.spec.sinks.len() as i32,
1032 observed_generation: connect.metadata.generation.unwrap_or(0),
1033 conditions: vec![],
1034 connector_statuses: vec![],
1035 last_updated: Some(Utc::now().to_rfc3339()),
1036 message: Some("Draining sink connectors before deletion".to_string()),
1037 };
1038 if let Err(e) = update_connect_status(&ctx.client, &namespace, &name, draining_status).await
1039 {
1040 warn!(name = %name, error = %e, "Failed to set Terminating status — continuing cleanup");
1041 }
1042 }
1043
1044 let cm_name = format!("rivven-connect-{}", name);
1046 let configmaps: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &namespace);
1047 match configmaps.delete(&cm_name, &Default::default()).await {
1048 Ok(_) => info!(name = %name, configmap = %cm_name, "Deleted operator-owned ConfigMap"),
1049 Err(kube::Error::Api(ae)) if ae.code == 404 => {
1050 debug!(name = %name, configmap = %cm_name, "ConfigMap already deleted");
1051 }
1052 Err(e) => {
1053 warn!(name = %name, configmap = %cm_name, error = %e, "Failed to delete ConfigMap — continuing cleanup");
1054 }
1055 }
1056
1057 let deploy_name = format!("rivven-connect-{}", name);
1059 let deployments: Api<Deployment> = Api::namespaced(ctx.client.clone(), &namespace);
1060 match deployments.delete(&deploy_name, &Default::default()).await {
1061 Ok(_) => {
1062 info!(name = %name, deployment = %deploy_name, "Deleted operator-owned Deployment")
1063 }
1064 Err(kube::Error::Api(ae)) if ae.code == 404 => {
1065 debug!(name = %name, deployment = %deploy_name, "Deployment already deleted");
1066 }
1067 Err(e) => {
1068 warn!(name = %name, deployment = %deploy_name, error = %e, "Failed to delete Deployment — continuing cleanup");
1069 }
1070 }
1071
1072 let svc_name = format!("rivven-connect-{}", name);
1074 let services: Api<Service> = Api::namespaced(ctx.client.clone(), &namespace);
1075 match services.delete(&svc_name, &Default::default()).await {
1076 Ok(_) => info!(name = %name, service = %svc_name, "Deleted operator-owned Service"),
1077 Err(kube::Error::Api(ae)) if ae.code == 404 => {
1078 debug!(name = %name, service = %svc_name, "Service already deleted");
1079 }
1080 Err(e) => {
1081 warn!(name = %name, service = %svc_name, error = %e, "Failed to delete Service — continuing cleanup");
1082 }
1083 }
1084
1085 info!(name = %name, "Connect cleanup complete");
1086
1087 Ok(Action::await_change())
1088}
1089
1090fn connect_error_policy(
1092 _connect: Arc<RivvenConnect>,
1093 error: &OperatorError,
1094 _ctx: Arc<ConnectControllerContext>,
1095) -> Action {
1096 warn!(
1097 error = %error,
1098 "Connect reconciliation error, will retry"
1099 );
1100
1101 let delay = error
1102 .requeue_delay()
1103 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
1104
1105 Action::requeue(delay)
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111 use crate::crd::{
1112 ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
1113 SourceConnectorSpec, SourceTopicConfigSpec,
1114 };
1115
1116 fn create_test_connect() -> RivvenConnect {
1117 RivvenConnect {
1118 metadata: ObjectMeta {
1119 name: Some("test-connect".to_string()),
1120 namespace: Some("default".to_string()),
1121 uid: Some("test-uid".to_string()),
1122 generation: Some(1),
1123 ..Default::default()
1124 },
1125 spec: RivvenConnectSpec {
1126 cluster_ref: ClusterReference {
1127 name: "test-cluster".to_string(),
1128 namespace: None,
1129 },
1130 replicas: 2,
1131 version: "0.0.1".to_string(),
1132 image: None,
1133 image_pull_policy: "IfNotPresent".to_string(),
1134 image_pull_secrets: vec![],
1135 resources: None,
1136 config: ConnectConfigSpec::default(),
1137 sources: vec![SourceConnectorSpec {
1138 name: "test-source".to_string(),
1139 connector: "datagen".to_string(),
1140 topic: "test-topic".to_string(),
1141 topic_routing: None,
1142 enabled: true,
1143 config: serde_json::Value::Null,
1144 config_secret_ref: None,
1145 topic_config: SourceTopicConfigSpec::default(),
1146 }],
1147 sinks: vec![SinkConnectorSpec {
1148 name: "test-sink".to_string(),
1149 connector: "stdout".to_string(),
1150 topics: vec!["test-topic".to_string()],
1151 consumer_group: "test-group".to_string(),
1152 enabled: true,
1153 start_offset: "latest".to_string(),
1154 config: serde_json::Value::Null,
1155 config_secret_ref: None,
1156 rate_limit: RateLimitSpec::default(),
1157 }],
1158 settings: GlobalConnectSettings::default(),
1159 tls: ConnectTlsSpec::default(),
1160 pod_annotations: BTreeMap::new(),
1161 pod_labels: BTreeMap::new(),
1162 env: vec![],
1163 node_selector: BTreeMap::new(),
1164 tolerations: vec![],
1165 affinity: None,
1166 service_account: None,
1167 security_context: None,
1168 container_security_context: None,
1169 },
1170 status: None,
1171 }
1172 }
1173
1174 #[test]
1175 fn test_build_connect_status_pending() {
1176 let connect = create_test_connect();
1177 let status = build_connect_status(&connect, None);
1178
1179 assert_eq!(status.phase, ConnectPhase::Pending);
1180 assert_eq!(status.replicas, 0);
1181 assert_eq!(status.ready_replicas, 0);
1182 assert_eq!(status.sources_total, 1);
1183 assert_eq!(status.sinks_total, 1);
1184 }
1185
1186 #[test]
1187 fn test_build_connect_status_running() {
1188 let connect = create_test_connect();
1189 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1190 replicas: Some(2),
1191 ready_replicas: Some(2),
1192 updated_replicas: Some(2),
1193 ..Default::default()
1194 };
1195
1196 let status = build_connect_status(&connect, Some(deploy_status));
1197
1198 assert_eq!(status.phase, ConnectPhase::Running);
1199 assert_eq!(status.replicas, 2);
1200 assert_eq!(status.ready_replicas, 2);
1201 assert_eq!(status.sources_running, 1);
1202 assert_eq!(status.sinks_running, 1);
1203 }
1204
1205 #[test]
1206 fn test_build_connect_status_degraded() {
1207 let connect = create_test_connect();
1208 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1209 replicas: Some(2),
1210 ready_replicas: Some(1),
1211 updated_replicas: Some(2),
1212 ..Default::default()
1213 };
1214
1215 let status = build_connect_status(&connect, Some(deploy_status));
1216
1217 assert_eq!(status.phase, ConnectPhase::Degraded);
1218 }
1219
1220 #[test]
1221 fn test_build_failed_status() {
1222 let connect = create_test_connect();
1223 let status = build_failed_status(&connect, "Test error");
1224
1225 assert_eq!(status.phase, ConnectPhase::Failed);
1226 assert_eq!(status.message, Some("Test error".to_string()));
1227 assert_eq!(status.conditions.len(), 1);
1228 assert_eq!(status.conditions[0].status, "False");
1229 }
1230
1231 #[test]
1232 fn test_build_pipeline_yaml() {
1233 let connect = create_test_connect();
1234 let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1235
1236 assert!(yaml.contains("version:"));
1238 assert!(yaml.contains("1.0"));
1239 assert!(yaml.contains("bootstrap_servers:"));
1240 assert!(yaml.contains("rivven-test-cluster"));
1241 assert!(yaml.contains("sources:"));
1242 assert!(yaml.contains("test-source"));
1243 assert!(yaml.contains("sinks:"));
1244 assert!(yaml.contains("test-sink"));
1245 }
1246
1247 #[test]
1248 fn test_build_connect_configmap() {
1249 let connect = create_test_connect();
1250 let cm = build_connect_configmap(&connect).unwrap();
1251
1252 assert_eq!(
1253 cm.metadata.name,
1254 Some("rivven-connect-test-connect".to_string())
1255 );
1256 assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1257 }
1258
1259 #[test]
1260 fn test_build_connect_deployment() {
1261 let connect = create_test_connect();
1262 let deployment = build_connect_deployment(&connect).unwrap();
1263
1264 assert_eq!(
1265 deployment.metadata.name,
1266 Some("rivven-connect-test-connect".to_string())
1267 );
1268 assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1269 }
1270
1271 #[test]
1272 fn test_build_connect_service() {
1273 let connect = create_test_connect();
1274 let service = build_connect_service(&connect).unwrap();
1275
1276 assert_eq!(
1277 service.metadata.name,
1278 Some("rivven-connect-test-connect".to_string())
1279 );
1280 let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1281 assert_eq!(ports.len(), 2);
1282 }
1283
1284 #[test]
1285 fn test_connector_statuses() {
1286 let connect = create_test_connect();
1287 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1288 replicas: Some(2),
1289 ready_replicas: Some(2),
1290 updated_replicas: Some(2),
1291 ..Default::default()
1292 };
1293
1294 let status = build_connect_status(&connect, Some(deploy_status));
1295
1296 assert_eq!(status.connector_statuses.len(), 2);
1297
1298 let source_status = status
1299 .connector_statuses
1300 .iter()
1301 .find(|s| s.name == "test-source")
1302 .unwrap();
1303 assert_eq!(source_status.connector_type, "source");
1304 assert_eq!(source_status.state, "running");
1305
1306 let sink_status = status
1307 .connector_statuses
1308 .iter()
1309 .find(|s| s.name == "test-sink")
1310 .unwrap();
1311 assert_eq!(sink_status.connector_type, "sink");
1312 assert_eq!(sink_status.state, "running");
1313 }
1314
1315 #[test]
1316 fn test_conditions() {
1317 let connect = create_test_connect();
1318 let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1319 replicas: Some(2),
1320 ready_replicas: Some(2),
1321 updated_replicas: Some(2),
1322 ..Default::default()
1323 };
1324
1325 let status = build_connect_status(&connect, Some(deploy_status));
1326
1327 assert_eq!(status.conditions.len(), 4);
1328
1329 let ready_cond = status
1330 .conditions
1331 .iter()
1332 .find(|c| c.condition_type == "Ready")
1333 .unwrap();
1334 assert_eq!(ready_cond.status, "True");
1335
1336 let broker_cond = status
1337 .conditions
1338 .iter()
1339 .find(|c| c.condition_type == "BrokerConnected")
1340 .unwrap();
1341 assert_eq!(broker_cond.status, "True");
1342 }
1343
1344 #[test]
1345 fn test_yaml_escape_prevents_injection() {
1346 let mut connect = create_test_connect();
1350 connect.spec.sources[0].name = "evil\ninjected_key: true".to_string();
1351 let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1352
1353 assert!(!yaml.contains("\ninjected_key: true\n"));
1356
1357 let parsed: serde_yaml::Value = serde_yaml::from_str(&yaml).unwrap();
1359 let sources = parsed.get("sources").unwrap().as_mapping().unwrap();
1360 assert!(sources
1362 .keys()
1363 .any(|k| k.as_str() == Some("evil\ninjected_key: true")));
1364 }
1365
1366 #[test]
1367 fn test_build_pipeline_yaml_escapes_values() {
1368 let mut connect = create_test_connect();
1369 connect.spec.sources[0].name = "evil\ninjected_key: true".to_string();
1371 let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1372 let parsed: serde_yaml::Value = serde_yaml::from_str(&yaml).unwrap();
1374 assert!(parsed.get("sources").is_some());
1375 }
1376}