Skip to main content

rivven_operator/
connect_controller.rs

1//! RivvenConnect Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenConnect
4//! custom resources. It watches for changes and reconciles connector pipelines.
5
6use crate::crd::{
7    ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8    RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27/// Finalizer name for cleanup operations
28pub const CONNECT_FINALIZER: &str = "rivven.hupe1980.github.io/connect-finalizer";
29
30/// Default requeue interval for successful reconciliations
31const DEFAULT_REQUEUE_SECONDS: u64 = 60; // 1 minute
32
33/// Requeue interval for error cases
34const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36/// Context passed to the connect controller
37pub struct ConnectControllerContext {
38    /// Kubernetes client
39    pub client: Client,
40    /// Metrics recorder
41    pub metrics: Option<ConnectControllerMetrics>,
42}
43
44/// Metrics for the connect controller
45#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47    /// Counter for reconciliation attempts
48    pub reconciliations: metrics::Counter,
49    /// Counter for reconciliation errors
50    pub errors: metrics::Counter,
51    /// Histogram for reconciliation duration
52    pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56    /// Create new connect controller metrics
57    pub fn new() -> Self {
58        Self {
59            reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60            errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61            duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62        }
63    }
64}
65
66impl Default for ConnectControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenConnect controller
73pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let connects: Api<RivvenConnect> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(ConnectControllerContext {
80        client: client.clone(),
81        metrics: Some(ConnectControllerMetrics::new()),
82    });
83
84    info!(
85        namespace = namespace.as_deref().unwrap_or("all"),
86        "Starting RivvenConnect controller"
87    );
88
89    // Watch related resources for changes
90    let deployments = match &namespace {
91        Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92        None => Api::<Deployment>::all(client.clone()),
93    };
94
95    let configmaps = match &namespace {
96        Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97        None => Api::<ConfigMap>::all(client.clone()),
98    };
99
100    Controller::new(connects.clone(), Config::default())
101        .owns(deployments, Config::default())
102        .owns(configmaps, Config::default())
103        .run(reconcile_connect, connect_error_policy, ctx)
104        .for_each(|result| async move {
105            match result {
106                Ok((obj, action)) => {
107                    debug!(
108                        name = obj.name,
109                        namespace = obj.namespace,
110                        ?action,
111                        "Connect reconciliation completed"
112                    );
113                }
114                Err(e) => {
115                    error!(error = %e, "Connect reconciliation failed");
116                }
117            }
118        })
119        .await;
120
121    Ok(())
122}
123
124/// Main reconciliation function for RivvenConnect
125#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127    connect: Arc<RivvenConnect>,
128    ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130    let start = std::time::Instant::now();
131
132    if let Some(ref metrics) = ctx.metrics {
133        metrics.reconciliations.increment(1);
134    }
135
136    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137    let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139    let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140        match event {
141            FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142            FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143        }
144    })
145    .await;
146
147    if let Some(ref metrics) = ctx.metrics {
148        metrics.duration.record(start.elapsed().as_secs_f64());
149    }
150
151    result.map_err(|e| {
152        if let Some(ref metrics) = ctx.metrics {
153            metrics.errors.increment(1);
154        }
155        OperatorError::ReconcileFailed(e.to_string())
156    })
157}
158
159/// Apply (create/update) the connect resources
160#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162    connect: Arc<RivvenConnect>,
163    ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165    let name = connect.name_any();
166    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168    info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170    // Validate the connect spec
171    if let Err(errors) = connect.spec.validate() {
172        let error_messages: Vec<String> = errors
173            .field_errors()
174            .iter()
175            .flat_map(|(field, errs)| {
176                errs.iter()
177                    .map(move |e| format!("{}: {:?}", field, e.message))
178            })
179            .collect();
180        let error_msg = error_messages.join("; ");
181        warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183        // Update status to failed
184        update_connect_status(
185            &ctx.client,
186            &namespace,
187            &name,
188            build_failed_status(&connect, &error_msg),
189        )
190        .await?;
191
192        return Err(OperatorError::InvalidConfig(error_msg));
193    }
194
195    // Verify cluster reference exists
196    verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198    // Build and apply ConfigMap with pipeline configuration
199    let configmap = build_connect_configmap(&connect)?;
200    apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202    // Build and apply Deployment
203    let deployment = build_connect_deployment(&connect)?;
204    let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206    // Build and apply headless Service for connect workers
207    let service = build_connect_service(&connect);
208    apply_connect_service(&ctx.client, &namespace, service).await?;
209
210    // Update connect status
211    let status = build_connect_status(&connect, deploy_status);
212    update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214    info!(name = %name, "Connect reconciliation complete");
215
216    Ok(Action::requeue(Duration::from_secs(
217        DEFAULT_REQUEUE_SECONDS,
218    )))
219}
220
221/// Verify the referenced RivvenCluster exists
222async fn verify_cluster_ref(
223    client: &Client,
224    namespace: &str,
225    cluster_ref: &ClusterReference,
226) -> Result<()> {
227    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230    match clusters.get(&cluster_ref.name).await {
231        Ok(_) => Ok(()),
232        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233            format!("{}/{}", cluster_ns, cluster_ref.name),
234        )),
235        Err(e) => Err(OperatorError::from(e)),
236    }
237}
238
239/// Build ConfigMap containing the connect pipeline configuration
240fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241    let name = connect.name_any();
242    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244    // Build pipeline YAML configuration
245    let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247    let mut labels = BTreeMap::new();
248    labels.insert(
249        "app.kubernetes.io/name".to_string(),
250        "rivven-connect".to_string(),
251    );
252    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253    labels.insert(
254        "app.kubernetes.io/component".to_string(),
255        "connect".to_string(),
256    );
257    labels.insert(
258        "app.kubernetes.io/managed-by".to_string(),
259        "rivven-operator".to_string(),
260    );
261
262    let mut data = BTreeMap::new();
263    data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265    Ok(ConfigMap {
266        metadata: ObjectMeta {
267            name: Some(format!("rivven-connect-{}", name)),
268            namespace: Some(namespace),
269            labels: Some(labels),
270            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
271            ..Default::default()
272        },
273        data: Some(data),
274        ..Default::default()
275    })
276}
277
278/// Build the pipeline YAML from the spec
279fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
280    use std::fmt::Write;
281
282    let mut yaml = String::new();
283
284    writeln!(yaml, "version: \"1.0\"").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
285    writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
286
287    // Broker configuration
288    writeln!(yaml, "broker:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
289    writeln!(yaml, "  bootstrap_servers:")
290        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
291
292    // Use cluster service discovery
293    let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
294    let cluster_svc = format!(
295        "rivven-{}-client.{}.svc.cluster.local:9092",
296        spec.cluster_ref.name, cluster_ns
297    );
298    writeln!(yaml, "    - {}", cluster_svc)
299        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
300    writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
301
302    // Global settings
303    if !spec.sources.is_empty() || !spec.sinks.is_empty() {
304        writeln!(yaml, "settings:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
305        writeln!(yaml, "  topic:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
306        writeln!(yaml, "    auto_create: {}", spec.settings.topic.auto_create)
307            .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
308        writeln!(
309            yaml,
310            "    default_partitions: {}",
311            spec.settings.topic.default_partitions
312        )
313        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
314        writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
315    }
316
317    // Sources
318    if !spec.sources.is_empty() {
319        writeln!(yaml, "sources:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
320        for source in &spec.sources {
321            writeln!(yaml, "  {}:", source.name)
322                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
323            writeln!(yaml, "    connector: {}", source.connector)
324                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
325            writeln!(yaml, "    topic: {}", source.topic)
326                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
327            writeln!(yaml, "    enabled: {}", source.enabled)
328                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
329
330            // Write config - merge topic_routing into config for CDC connectors
331            // topic_routing is CDC-specific and must be in the connector config section
332            let mut config = source.config.clone();
333            if let Some(ref routing) = source.topic_routing {
334                if let serde_json::Value::Object(ref mut map) = config {
335                    map.insert("topic_routing".to_string(), serde_json::json!(routing));
336                } else if config.is_null() {
337                    config = serde_json::json!({"topic_routing": routing});
338                }
339            }
340
341            if !config.is_null() {
342                writeln!(yaml, "    config:")
343                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
344                // Serialize JSON config to YAML
345                let config_str = serde_json::to_string_pretty(&config)
346                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
347                for line in config_str.lines() {
348                    writeln!(yaml, "      {}", line)
349                        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
350                }
351            }
352        }
353        writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
354    }
355
356    // Sinks
357    if !spec.sinks.is_empty() {
358        writeln!(yaml, "sinks:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
359        for sink in &spec.sinks {
360            writeln!(yaml, "  {}:", sink.name)
361                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
362            writeln!(yaml, "    connector: {}", sink.connector)
363                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
364            writeln!(yaml, "    topics: {:?}", sink.topics)
365                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
366            writeln!(yaml, "    consumer_group: {}", sink.consumer_group)
367                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
368            writeln!(yaml, "    enabled: {}", sink.enabled)
369                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
370
371            // Write config if not null/empty
372            if !sink.config.is_null() {
373                writeln!(yaml, "    config:")
374                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
375                let config_str = serde_json::to_string_pretty(&sink.config)
376                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
377                for line in config_str.lines() {
378                    writeln!(yaml, "      {}", line)
379                        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
380                }
381            }
382        }
383    }
384
385    Ok(yaml)
386}
387
388/// Build the Deployment for connect workers
389fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
390    let name = connect.name_any();
391    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
392    let spec = &connect.spec;
393
394    let mut labels = BTreeMap::new();
395    labels.insert(
396        "app.kubernetes.io/name".to_string(),
397        "rivven-connect".to_string(),
398    );
399    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
400    labels.insert(
401        "app.kubernetes.io/component".to_string(),
402        "connect".to_string(),
403    );
404    labels.insert(
405        "app.kubernetes.io/managed-by".to_string(),
406        "rivven-operator".to_string(),
407    );
408
409    // Merge user labels
410    for (k, v) in &spec.pod_labels {
411        if !k.starts_with("app.kubernetes.io/") {
412            labels.insert(k.clone(), v.clone());
413        }
414    }
415
416    let image = spec
417        .image
418        .clone()
419        .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
420
421    // Build container
422    let container = k8s_openapi::api::core::v1::Container {
423        name: "connect".to_string(),
424        image: Some(image),
425        image_pull_policy: Some(spec.image_pull_policy.clone()),
426        args: Some(vec![
427            "run".to_string(),
428            "--config".to_string(),
429            "/config/pipeline.yaml".to_string(),
430        ]),
431        env: Some(spec.env.clone()),
432        volume_mounts: Some(vec![
433            k8s_openapi::api::core::v1::VolumeMount {
434                name: "config".to_string(),
435                mount_path: "/config".to_string(),
436                read_only: Some(true),
437                ..Default::default()
438            },
439            k8s_openapi::api::core::v1::VolumeMount {
440                name: "data".to_string(),
441                mount_path: "/data".to_string(),
442                ..Default::default()
443            },
444        ]),
445        resources: spec
446            .resources
447            .as_ref()
448            .and_then(|r| serde_json::from_value(r.clone()).ok()),
449        security_context: spec
450            .container_security_context
451            .as_ref()
452            .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
453        liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
454            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
455                path: Some("/health".to_string()),
456                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
457                ..Default::default()
458            }),
459            initial_delay_seconds: Some(30),
460            period_seconds: Some(10),
461            ..Default::default()
462        }),
463        readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
464            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
465                path: Some("/ready".to_string()),
466                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
467                ..Default::default()
468            }),
469            initial_delay_seconds: Some(10),
470            period_seconds: Some(5),
471            ..Default::default()
472        }),
473        ports: Some(vec![
474            k8s_openapi::api::core::v1::ContainerPort {
475                name: Some("http".to_string()),
476                container_port: 8080,
477                ..Default::default()
478            },
479            k8s_openapi::api::core::v1::ContainerPort {
480                name: Some("metrics".to_string()),
481                container_port: 9090,
482                ..Default::default()
483            },
484        ]),
485        ..Default::default()
486    };
487
488    // Build volumes
489    let volumes = vec![
490        k8s_openapi::api::core::v1::Volume {
491            name: "config".to_string(),
492            config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
493                name: format!("rivven-connect-{}", name),
494                ..Default::default()
495            }),
496            ..Default::default()
497        },
498        k8s_openapi::api::core::v1::Volume {
499            name: "data".to_string(),
500            empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
501            ..Default::default()
502        },
503    ];
504
505    // Build image pull secrets
506    let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
507        None
508    } else {
509        Some(
510            spec.image_pull_secrets
511                .iter()
512                .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
513                .collect(),
514        )
515    };
516
517    let pod_spec = k8s_openapi::api::core::v1::PodSpec {
518        containers: vec![container],
519        volumes: Some(volumes),
520        image_pull_secrets,
521        service_account_name: spec.service_account.clone(),
522        node_selector: if spec.node_selector.is_empty() {
523            None
524        } else {
525            Some(spec.node_selector.clone())
526        },
527        tolerations: if spec.tolerations.is_empty() {
528            None
529        } else {
530            Some(spec.tolerations.clone())
531        },
532        affinity: spec
533            .affinity
534            .as_ref()
535            .and_then(|a| serde_json::from_value(a.clone()).ok()),
536        security_context: spec
537            .security_context
538            .as_ref()
539            .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
540        ..Default::default()
541    };
542
543    Ok(Deployment {
544        metadata: ObjectMeta {
545            name: Some(format!("rivven-connect-{}", name)),
546            namespace: Some(namespace),
547            labels: Some(labels.clone()),
548            annotations: Some(spec.pod_annotations.clone()),
549            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
550            ..Default::default()
551        },
552        spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
553            replicas: Some(spec.replicas),
554            selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
555                match_labels: Some(labels.clone()),
556                ..Default::default()
557            },
558            template: k8s_openapi::api::core::v1::PodTemplateSpec {
559                metadata: Some(ObjectMeta {
560                    labels: Some(labels),
561                    annotations: Some(spec.pod_annotations.clone()),
562                    ..Default::default()
563                }),
564                spec: Some(pod_spec),
565            },
566            ..Default::default()
567        }),
568        ..Default::default()
569    })
570}
571
572/// Build the Service for connect workers
573fn build_connect_service(connect: &RivvenConnect) -> Service {
574    let name = connect.name_any();
575    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
576
577    let mut labels = BTreeMap::new();
578    labels.insert(
579        "app.kubernetes.io/name".to_string(),
580        "rivven-connect".to_string(),
581    );
582    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
583    labels.insert(
584        "app.kubernetes.io/component".to_string(),
585        "connect".to_string(),
586    );
587    labels.insert(
588        "app.kubernetes.io/managed-by".to_string(),
589        "rivven-operator".to_string(),
590    );
591
592    Service {
593        metadata: ObjectMeta {
594            name: Some(format!("rivven-connect-{}", name)),
595            namespace: Some(namespace),
596            labels: Some(labels.clone()),
597            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
598            ..Default::default()
599        },
600        spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
601            selector: Some(labels),
602            ports: Some(vec![
603                k8s_openapi::api::core::v1::ServicePort {
604                    name: Some("http".to_string()),
605                    port: 8080,
606                    target_port: Some(
607                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
608                    ),
609                    ..Default::default()
610                },
611                k8s_openapi::api::core::v1::ServicePort {
612                    name: Some("metrics".to_string()),
613                    port: 9090,
614                    target_port: Some(
615                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
616                    ),
617                    ..Default::default()
618                },
619            ]),
620            ..Default::default()
621        }),
622        ..Default::default()
623    }
624}
625
626/// Apply ConfigMap using server-side apply
627async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
628    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
629    let name = cm.metadata.name.as_ref().unwrap();
630
631    debug!(name = %name, "Applying Connect ConfigMap");
632
633    let patch_params = PatchParams::apply("rivven-operator").force();
634    api.patch(name, &patch_params, &Patch::Apply(&cm))
635        .await
636        .map_err(OperatorError::from)?;
637
638    Ok(())
639}
640
641/// Apply Deployment using server-side apply
642async fn apply_connect_deployment(
643    client: &Client,
644    namespace: &str,
645    deployment: Deployment,
646) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
647    let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
648    let name = deployment.metadata.name.as_ref().unwrap();
649
650    debug!(name = %name, "Applying Connect Deployment");
651
652    let patch_params = PatchParams::apply("rivven-operator").force();
653    let result = api
654        .patch(name, &patch_params, &Patch::Apply(&deployment))
655        .await
656        .map_err(OperatorError::from)?;
657
658    Ok(result.status)
659}
660
661/// Apply Service using server-side apply
662async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
663    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
664    let name = svc.metadata.name.as_ref().unwrap();
665
666    debug!(name = %name, "Applying Connect Service");
667
668    let patch_params = PatchParams::apply("rivven-operator").force();
669    api.patch(name, &patch_params, &Patch::Apply(&svc))
670        .await
671        .map_err(OperatorError::from)?;
672
673    Ok(())
674}
675
676/// Build connect status from deployment status
677fn build_connect_status(
678    connect: &RivvenConnect,
679    deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
680) -> RivvenConnectStatus {
681    let now = Utc::now().to_rfc3339();
682    let spec = &connect.spec;
683
684    let (replicas, ready_replicas, updated_replicas) = deploy_status
685        .map(|s| {
686            (
687                s.replicas.unwrap_or(0),
688                s.ready_replicas.unwrap_or(0),
689                s.updated_replicas.unwrap_or(0),
690            )
691        })
692        .unwrap_or((0, 0, 0));
693
694    let desired_replicas = spec.replicas;
695
696    // Determine phase
697    let phase = if ready_replicas == 0 {
698        ConnectPhase::Pending
699    } else if ready_replicas < desired_replicas {
700        if updated_replicas < desired_replicas {
701            ConnectPhase::Starting
702        } else {
703            ConnectPhase::Degraded
704        }
705    } else {
706        ConnectPhase::Running
707    };
708
709    // Count configured connectors
710    let sources_total = spec.sources.len() as i32;
711    let sinks_total = spec.sinks.len() as i32;
712
713    // Estimate running connectors (simplified - in production, query the connect workers)
714    let sources_running = if phase == ConnectPhase::Running {
715        spec.sources.iter().filter(|s| s.enabled).count() as i32
716    } else {
717        0
718    };
719    let sinks_running = if phase == ConnectPhase::Running {
720        spec.sinks.iter().filter(|s| s.enabled).count() as i32
721    } else {
722        0
723    };
724
725    // Build conditions
726    let mut conditions = vec![];
727
728    conditions.push(ConnectCondition {
729        condition_type: "Ready".to_string(),
730        status: if ready_replicas >= desired_replicas {
731            "True".to_string()
732        } else {
733            "False".to_string()
734        },
735        reason: Some(format!(
736            "{}/{} replicas ready",
737            ready_replicas, desired_replicas
738        )),
739        message: None,
740        last_transition_time: Some(now.clone()),
741    });
742
743    conditions.push(ConnectCondition {
744        condition_type: "BrokerConnected".to_string(),
745        status: if phase == ConnectPhase::Running {
746            "True".to_string()
747        } else {
748            "Unknown".to_string()
749        },
750        reason: Some("ClusterRefValid".to_string()),
751        message: None,
752        last_transition_time: Some(now.clone()),
753    });
754
755    conditions.push(ConnectCondition {
756        condition_type: "SourcesHealthy".to_string(),
757        status: if sources_running == sources_total && sources_total > 0 {
758            "True".to_string()
759        } else if sources_running > 0 {
760            "Partial".to_string()
761        } else if sources_total == 0 {
762            "N/A".to_string()
763        } else {
764            "False".to_string()
765        },
766        reason: Some(format!(
767            "{}/{} sources running",
768            sources_running, sources_total
769        )),
770        message: None,
771        last_transition_time: Some(now.clone()),
772    });
773
774    conditions.push(ConnectCondition {
775        condition_type: "SinksHealthy".to_string(),
776        status: if sinks_running == sinks_total && sinks_total > 0 {
777            "True".to_string()
778        } else if sinks_running > 0 {
779            "Partial".to_string()
780        } else if sinks_total == 0 {
781            "N/A".to_string()
782        } else {
783            "False".to_string()
784        },
785        reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
786        message: None,
787        last_transition_time: Some(now.clone()),
788    });
789
790    // Build connector statuses
791    let mut connector_statuses = Vec::new();
792
793    for source in &spec.sources {
794        connector_statuses.push(ConnectorStatus {
795            name: source.name.clone(),
796            connector_type: "source".to_string(),
797            kind: source.connector.clone(),
798            state: if source.enabled && phase == ConnectPhase::Running {
799                "running".to_string()
800            } else if !source.enabled {
801                "disabled".to_string()
802            } else {
803                "pending".to_string()
804            },
805            events_processed: 0, // Would need to query metrics
806            last_error: None,
807            last_success_time: None,
808        });
809    }
810
811    for sink in &spec.sinks {
812        connector_statuses.push(ConnectorStatus {
813            name: sink.name.clone(),
814            connector_type: "sink".to_string(),
815            kind: sink.connector.clone(),
816            state: if sink.enabled && phase == ConnectPhase::Running {
817                "running".to_string()
818            } else if !sink.enabled {
819                "disabled".to_string()
820            } else {
821                "pending".to_string()
822            },
823            events_processed: 0,
824            last_error: None,
825            last_success_time: None,
826        });
827    }
828
829    RivvenConnectStatus {
830        phase,
831        replicas,
832        ready_replicas,
833        sources_running,
834        sinks_running,
835        sources_total,
836        sinks_total,
837        observed_generation: connect.metadata.generation.unwrap_or(0),
838        conditions,
839        connector_statuses,
840        last_updated: Some(now),
841        message: None,
842    }
843}
844
845/// Build a failed status
846fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
847    let now = Utc::now().to_rfc3339();
848
849    RivvenConnectStatus {
850        phase: ConnectPhase::Failed,
851        replicas: 0,
852        ready_replicas: 0,
853        sources_running: 0,
854        sinks_running: 0,
855        sources_total: connect.spec.sources.len() as i32,
856        sinks_total: connect.spec.sinks.len() as i32,
857        observed_generation: connect.metadata.generation.unwrap_or(0),
858        conditions: vec![ConnectCondition {
859            condition_type: "Ready".to_string(),
860            status: "False".to_string(),
861            reason: Some("ValidationFailed".to_string()),
862            message: Some(error_msg.to_string()),
863            last_transition_time: Some(now.clone()),
864        }],
865        connector_statuses: vec![],
866        last_updated: Some(now),
867        message: Some(error_msg.to_string()),
868    }
869}
870
871/// Update the connect status subresource
872async fn update_connect_status(
873    client: &Client,
874    namespace: &str,
875    name: &str,
876    status: RivvenConnectStatus,
877) -> Result<()> {
878    let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
879
880    debug!(name = %name, phase = ?status.phase, "Updating connect status");
881
882    let patch = serde_json::json!({
883        "status": status
884    });
885
886    let patch_params = PatchParams::default();
887    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
888        .await
889        .map_err(OperatorError::from)?;
890
891    Ok(())
892}
893
894/// Cleanup resources when connect is deleted
895#[instrument(skip(connect, _ctx))]
896async fn cleanup_connect(
897    connect: Arc<RivvenConnect>,
898    _ctx: Arc<ConnectControllerContext>,
899) -> Result<Action> {
900    let name = connect.name_any();
901    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
902
903    info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
904
905    // Resources with owner references will be garbage collected automatically
906    // Additional cleanup could include:
907    // 1. Graceful connector shutdown
908    // 2. Offset commit for sinks
909    // 3. External resource cleanup
910
911    info!(name = %name, "Connect cleanup complete");
912
913    Ok(Action::await_change())
914}
915
916/// Error policy for the connect controller
917fn connect_error_policy(
918    _connect: Arc<RivvenConnect>,
919    error: &OperatorError,
920    _ctx: Arc<ConnectControllerContext>,
921) -> Action {
922    warn!(
923        error = %error,
924        "Connect reconciliation error, will retry"
925    );
926
927    let delay = error
928        .requeue_delay()
929        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
930
931    Action::requeue(delay)
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937    use crate::crd::{
938        ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
939        SourceConnectorSpec, SourceTopicConfigSpec,
940    };
941
942    fn create_test_connect() -> RivvenConnect {
943        RivvenConnect {
944            metadata: ObjectMeta {
945                name: Some("test-connect".to_string()),
946                namespace: Some("default".to_string()),
947                uid: Some("test-uid".to_string()),
948                generation: Some(1),
949                ..Default::default()
950            },
951            spec: RivvenConnectSpec {
952                cluster_ref: ClusterReference {
953                    name: "test-cluster".to_string(),
954                    namespace: None,
955                },
956                replicas: 2,
957                version: "0.0.1".to_string(),
958                image: None,
959                image_pull_policy: "IfNotPresent".to_string(),
960                image_pull_secrets: vec![],
961                resources: None,
962                config: ConnectConfigSpec::default(),
963                sources: vec![SourceConnectorSpec {
964                    name: "test-source".to_string(),
965                    connector: "datagen".to_string(),
966                    topic: "test-topic".to_string(),
967                    topic_routing: None,
968                    enabled: true,
969                    config: serde_json::Value::Null,
970                    config_secret_ref: None,
971                    topic_config: SourceTopicConfigSpec::default(),
972                }],
973                sinks: vec![SinkConnectorSpec {
974                    name: "test-sink".to_string(),
975                    connector: "stdout".to_string(),
976                    topics: vec!["test-topic".to_string()],
977                    consumer_group: "test-group".to_string(),
978                    enabled: true,
979                    start_offset: "latest".to_string(),
980                    config: serde_json::Value::Null,
981                    config_secret_ref: None,
982                    rate_limit: RateLimitSpec::default(),
983                }],
984                settings: GlobalConnectSettings::default(),
985                tls: ConnectTlsSpec::default(),
986                pod_annotations: BTreeMap::new(),
987                pod_labels: BTreeMap::new(),
988                env: vec![],
989                node_selector: BTreeMap::new(),
990                tolerations: vec![],
991                affinity: None,
992                service_account: None,
993                security_context: None,
994                container_security_context: None,
995            },
996            status: None,
997        }
998    }
999
1000    #[test]
1001    fn test_build_connect_status_pending() {
1002        let connect = create_test_connect();
1003        let status = build_connect_status(&connect, None);
1004
1005        assert_eq!(status.phase, ConnectPhase::Pending);
1006        assert_eq!(status.replicas, 0);
1007        assert_eq!(status.ready_replicas, 0);
1008        assert_eq!(status.sources_total, 1);
1009        assert_eq!(status.sinks_total, 1);
1010    }
1011
1012    #[test]
1013    fn test_build_connect_status_running() {
1014        let connect = create_test_connect();
1015        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1016            replicas: Some(2),
1017            ready_replicas: Some(2),
1018            updated_replicas: Some(2),
1019            ..Default::default()
1020        };
1021
1022        let status = build_connect_status(&connect, Some(deploy_status));
1023
1024        assert_eq!(status.phase, ConnectPhase::Running);
1025        assert_eq!(status.replicas, 2);
1026        assert_eq!(status.ready_replicas, 2);
1027        assert_eq!(status.sources_running, 1);
1028        assert_eq!(status.sinks_running, 1);
1029    }
1030
1031    #[test]
1032    fn test_build_connect_status_degraded() {
1033        let connect = create_test_connect();
1034        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1035            replicas: Some(2),
1036            ready_replicas: Some(1),
1037            updated_replicas: Some(2),
1038            ..Default::default()
1039        };
1040
1041        let status = build_connect_status(&connect, Some(deploy_status));
1042
1043        assert_eq!(status.phase, ConnectPhase::Degraded);
1044    }
1045
1046    #[test]
1047    fn test_build_failed_status() {
1048        let connect = create_test_connect();
1049        let status = build_failed_status(&connect, "Test error");
1050
1051        assert_eq!(status.phase, ConnectPhase::Failed);
1052        assert_eq!(status.message, Some("Test error".to_string()));
1053        assert_eq!(status.conditions.len(), 1);
1054        assert_eq!(status.conditions[0].status, "False");
1055    }
1056
1057    #[test]
1058    fn test_build_pipeline_yaml() {
1059        let connect = create_test_connect();
1060        let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1061
1062        assert!(yaml.contains("version: \"1.0\""));
1063        assert!(yaml.contains("bootstrap_servers:"));
1064        assert!(yaml.contains("rivven-test-cluster-client"));
1065        assert!(yaml.contains("sources:"));
1066        assert!(yaml.contains("test-source:"));
1067        assert!(yaml.contains("sinks:"));
1068        assert!(yaml.contains("test-sink:"));
1069    }
1070
1071    #[test]
1072    fn test_build_connect_configmap() {
1073        let connect = create_test_connect();
1074        let cm = build_connect_configmap(&connect).unwrap();
1075
1076        assert_eq!(
1077            cm.metadata.name,
1078            Some("rivven-connect-test-connect".to_string())
1079        );
1080        assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1081    }
1082
1083    #[test]
1084    fn test_build_connect_deployment() {
1085        let connect = create_test_connect();
1086        let deployment = build_connect_deployment(&connect).unwrap();
1087
1088        assert_eq!(
1089            deployment.metadata.name,
1090            Some("rivven-connect-test-connect".to_string())
1091        );
1092        assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1093    }
1094
1095    #[test]
1096    fn test_build_connect_service() {
1097        let connect = create_test_connect();
1098        let service = build_connect_service(&connect);
1099
1100        assert_eq!(
1101            service.metadata.name,
1102            Some("rivven-connect-test-connect".to_string())
1103        );
1104        let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1105        assert_eq!(ports.len(), 2);
1106    }
1107
1108    #[test]
1109    fn test_connector_statuses() {
1110        let connect = create_test_connect();
1111        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1112            replicas: Some(2),
1113            ready_replicas: Some(2),
1114            updated_replicas: Some(2),
1115            ..Default::default()
1116        };
1117
1118        let status = build_connect_status(&connect, Some(deploy_status));
1119
1120        assert_eq!(status.connector_statuses.len(), 2);
1121
1122        let source_status = status
1123            .connector_statuses
1124            .iter()
1125            .find(|s| s.name == "test-source")
1126            .unwrap();
1127        assert_eq!(source_status.connector_type, "source");
1128        assert_eq!(source_status.state, "running");
1129
1130        let sink_status = status
1131            .connector_statuses
1132            .iter()
1133            .find(|s| s.name == "test-sink")
1134            .unwrap();
1135        assert_eq!(sink_status.connector_type, "sink");
1136        assert_eq!(sink_status.state, "running");
1137    }
1138
1139    #[test]
1140    fn test_conditions() {
1141        let connect = create_test_connect();
1142        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1143            replicas: Some(2),
1144            ready_replicas: Some(2),
1145            updated_replicas: Some(2),
1146            ..Default::default()
1147        };
1148
1149        let status = build_connect_status(&connect, Some(deploy_status));
1150
1151        assert_eq!(status.conditions.len(), 4);
1152
1153        let ready_cond = status
1154            .conditions
1155            .iter()
1156            .find(|c| c.condition_type == "Ready")
1157            .unwrap();
1158        assert_eq!(ready_cond.status, "True");
1159
1160        let broker_cond = status
1161            .conditions
1162            .iter()
1163            .find(|c| c.condition_type == "BrokerConnected")
1164            .unwrap();
1165        assert_eq!(broker_cond.status, "True");
1166    }
1167}