Skip to main content

rivven_operator/
connect_controller.rs

1//! RivvenConnect Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenConnect
4//! custom resources. It watches for changes and reconciles connector pipelines.
5
6use crate::crd::{
7    ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8    RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27/// Finalizer name for cleanup operations
28pub const CONNECT_FINALIZER: &str = "rivven.io/connect-finalizer";
29
30/// Default requeue interval for successful reconciliations
31const DEFAULT_REQUEUE_SECONDS: u64 = 60; // 1 minute
32
33/// Requeue interval for error cases
34const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36/// Context passed to the connect controller
37pub struct ConnectControllerContext {
38    /// Kubernetes client
39    pub client: Client,
40    /// Metrics recorder
41    pub metrics: Option<ConnectControllerMetrics>,
42}
43
44/// Metrics for the connect controller
45#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47    /// Counter for reconciliation attempts
48    pub reconciliations: metrics::Counter,
49    /// Counter for reconciliation errors
50    pub errors: metrics::Counter,
51    /// Histogram for reconciliation duration
52    pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56    /// Create new connect controller metrics
57    pub fn new() -> Self {
58        Self {
59            reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60            errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61            duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62        }
63    }
64}
65
66impl Default for ConnectControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenConnect controller
73pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let connects: Api<RivvenConnect> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(ConnectControllerContext {
80        client: client.clone(),
81        metrics: Some(ConnectControllerMetrics::new()),
82    });
83
84    info!(
85        namespace = namespace.as_deref().unwrap_or("all"),
86        "Starting RivvenConnect controller"
87    );
88
89    // Watch related resources for changes
90    let deployments = match &namespace {
91        Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92        None => Api::<Deployment>::all(client.clone()),
93    };
94
95    let configmaps = match &namespace {
96        Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97        None => Api::<ConfigMap>::all(client.clone()),
98    };
99
100    Controller::new(connects.clone(), Config::default())
101        .owns(deployments, Config::default())
102        .owns(configmaps, Config::default())
103        .run(reconcile_connect, connect_error_policy, ctx)
104        .for_each(|result| async move {
105            match result {
106                Ok((obj, action)) => {
107                    debug!(
108                        name = obj.name,
109                        namespace = obj.namespace,
110                        ?action,
111                        "Connect reconciliation completed"
112                    );
113                }
114                Err(e) => {
115                    error!(error = %e, "Connect reconciliation failed");
116                }
117            }
118        })
119        .await;
120
121    Ok(())
122}
123
124/// Main reconciliation function for RivvenConnect
125#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127    connect: Arc<RivvenConnect>,
128    ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130    let start = std::time::Instant::now();
131
132    if let Some(ref metrics) = ctx.metrics {
133        metrics.reconciliations.increment(1);
134    }
135
136    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137    let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139    let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140        match event {
141            FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142            FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143        }
144    })
145    .await;
146
147    if let Some(ref metrics) = ctx.metrics {
148        metrics.duration.record(start.elapsed().as_secs_f64());
149    }
150
151    result.map_err(|e| {
152        if let Some(ref metrics) = ctx.metrics {
153            metrics.errors.increment(1);
154        }
155        OperatorError::ReconcileFailed(e.to_string())
156    })
157}
158
159/// Apply (create/update) the connect resources
160#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162    connect: Arc<RivvenConnect>,
163    ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165    let name = connect.name_any();
166    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168    info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170    // Validate the connect spec
171    if let Err(errors) = connect.spec.validate() {
172        let error_messages: Vec<String> = errors
173            .field_errors()
174            .iter()
175            .flat_map(|(field, errs)| {
176                errs.iter()
177                    .map(move |e| format!("{}: {:?}", field, e.message))
178            })
179            .collect();
180        let error_msg = error_messages.join("; ");
181        warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183        // Update status to failed
184        update_connect_status(
185            &ctx.client,
186            &namespace,
187            &name,
188            build_failed_status(&connect, &error_msg),
189        )
190        .await?;
191
192        return Err(OperatorError::InvalidConfig(error_msg));
193    }
194
195    // Verify cluster reference exists
196    verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198    // Build and apply ConfigMap with pipeline configuration
199    let configmap = build_connect_configmap(&connect)?;
200    apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202    // Build and apply Deployment
203    let deployment = build_connect_deployment(&connect)?;
204    let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206    // Build and apply headless Service for connect workers
207    let service = build_connect_service(&connect);
208    apply_connect_service(&ctx.client, &namespace, service).await?;
209
210    // Update connect status
211    let status = build_connect_status(&connect, deploy_status);
212    update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214    info!(name = %name, "Connect reconciliation complete");
215
216    Ok(Action::requeue(Duration::from_secs(
217        DEFAULT_REQUEUE_SECONDS,
218    )))
219}
220
221/// Verify the referenced RivvenCluster exists
222async fn verify_cluster_ref(
223    client: &Client,
224    namespace: &str,
225    cluster_ref: &ClusterReference,
226) -> Result<()> {
227    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230    match clusters.get(&cluster_ref.name).await {
231        Ok(_) => Ok(()),
232        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233            format!("{}/{}", cluster_ns, cluster_ref.name),
234        )),
235        Err(e) => Err(OperatorError::from(e)),
236    }
237}
238
239/// Build ConfigMap containing the connect pipeline configuration
240fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241    let name = connect.name_any();
242    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244    // Build pipeline YAML configuration
245    let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247    let mut labels = BTreeMap::new();
248    labels.insert(
249        "app.kubernetes.io/name".to_string(),
250        "rivven-connect".to_string(),
251    );
252    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253    labels.insert(
254        "app.kubernetes.io/component".to_string(),
255        "connect".to_string(),
256    );
257    labels.insert(
258        "app.kubernetes.io/managed-by".to_string(),
259        "rivven-operator".to_string(),
260    );
261
262    let mut data = BTreeMap::new();
263    data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265    Ok(ConfigMap {
266        metadata: ObjectMeta {
267            name: Some(format!("rivven-connect-{}", name)),
268            namespace: Some(namespace),
269            labels: Some(labels),
270            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
271            ..Default::default()
272        },
273        data: Some(data),
274        ..Default::default()
275    })
276}
277
278/// Build the pipeline YAML from the spec
279fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
280    use std::fmt::Write;
281
282    let mut yaml = String::new();
283
284    writeln!(yaml, "version: \"1.0\"").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
285    writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
286
287    // Broker configuration
288    writeln!(yaml, "broker:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
289    writeln!(yaml, "  bootstrap_servers:")
290        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
291
292    // Use cluster service discovery
293    let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
294    let cluster_svc = format!(
295        "rivven-{}-client.{}.svc.cluster.local:9092",
296        spec.cluster_ref.name, cluster_ns
297    );
298    writeln!(yaml, "    - {}", cluster_svc)
299        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
300    writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
301
302    // Global settings
303    if !spec.sources.is_empty() || !spec.sinks.is_empty() {
304        writeln!(yaml, "settings:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
305        writeln!(yaml, "  topic:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
306        writeln!(yaml, "    auto_create: {}", spec.settings.topic.auto_create)
307            .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
308        writeln!(
309            yaml,
310            "    default_partitions: {}",
311            spec.settings.topic.default_partitions
312        )
313        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
314        writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
315    }
316
317    // Sources
318    if !spec.sources.is_empty() {
319        writeln!(yaml, "sources:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
320        for source in &spec.sources {
321            writeln!(yaml, "  {}:", source.name)
322                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
323            writeln!(yaml, "    connector: {}", source.connector)
324                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
325            writeln!(yaml, "    topic: {}", source.topic)
326                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
327            writeln!(yaml, "    enabled: {}", source.enabled)
328                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
329
330            // Write config if not null/empty
331            if !source.config.is_null() {
332                writeln!(yaml, "    config:")
333                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
334                // Serialize JSON config to YAML
335                let config_str = serde_json::to_string_pretty(&source.config)
336                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
337                for line in config_str.lines() {
338                    writeln!(yaml, "      {}", line)
339                        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
340                }
341            }
342        }
343        writeln!(yaml).map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
344    }
345
346    // Sinks
347    if !spec.sinks.is_empty() {
348        writeln!(yaml, "sinks:").map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
349        for sink in &spec.sinks {
350            writeln!(yaml, "  {}:", sink.name)
351                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
352            writeln!(yaml, "    connector: {}", sink.connector)
353                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
354            writeln!(yaml, "    topics: {:?}", sink.topics)
355                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
356            writeln!(yaml, "    consumer_group: {}", sink.consumer_group)
357                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
358            writeln!(yaml, "    enabled: {}", sink.enabled)
359                .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
360
361            // Write config if not null/empty
362            if !sink.config.is_null() {
363                writeln!(yaml, "    config:")
364                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
365                let config_str = serde_json::to_string_pretty(&sink.config)
366                    .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
367                for line in config_str.lines() {
368                    writeln!(yaml, "      {}", line)
369                        .map_err(|e| OperatorError::InvalidConfig(e.to_string()))?;
370                }
371            }
372        }
373    }
374
375    Ok(yaml)
376}
377
378/// Build the Deployment for connect workers
379fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
380    let name = connect.name_any();
381    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
382    let spec = &connect.spec;
383
384    let mut labels = BTreeMap::new();
385    labels.insert(
386        "app.kubernetes.io/name".to_string(),
387        "rivven-connect".to_string(),
388    );
389    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
390    labels.insert(
391        "app.kubernetes.io/component".to_string(),
392        "connect".to_string(),
393    );
394    labels.insert(
395        "app.kubernetes.io/managed-by".to_string(),
396        "rivven-operator".to_string(),
397    );
398
399    // Merge user labels
400    for (k, v) in &spec.pod_labels {
401        if !k.starts_with("app.kubernetes.io/") {
402            labels.insert(k.clone(), v.clone());
403        }
404    }
405
406    let image = spec
407        .image
408        .clone()
409        .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
410
411    // Build container
412    let container = k8s_openapi::api::core::v1::Container {
413        name: "connect".to_string(),
414        image: Some(image),
415        image_pull_policy: Some(spec.image_pull_policy.clone()),
416        args: Some(vec![
417            "run".to_string(),
418            "--config".to_string(),
419            "/config/pipeline.yaml".to_string(),
420        ]),
421        env: Some(spec.env.clone()),
422        volume_mounts: Some(vec![
423            k8s_openapi::api::core::v1::VolumeMount {
424                name: "config".to_string(),
425                mount_path: "/config".to_string(),
426                read_only: Some(true),
427                ..Default::default()
428            },
429            k8s_openapi::api::core::v1::VolumeMount {
430                name: "data".to_string(),
431                mount_path: "/data".to_string(),
432                ..Default::default()
433            },
434        ]),
435        resources: spec
436            .resources
437            .as_ref()
438            .and_then(|r| serde_json::from_value(r.clone()).ok()),
439        security_context: spec
440            .container_security_context
441            .as_ref()
442            .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
443        liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
444            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
445                path: Some("/health".to_string()),
446                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
447                ..Default::default()
448            }),
449            initial_delay_seconds: Some(30),
450            period_seconds: Some(10),
451            ..Default::default()
452        }),
453        readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
454            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
455                path: Some("/ready".to_string()),
456                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
457                ..Default::default()
458            }),
459            initial_delay_seconds: Some(10),
460            period_seconds: Some(5),
461            ..Default::default()
462        }),
463        ports: Some(vec![
464            k8s_openapi::api::core::v1::ContainerPort {
465                name: Some("http".to_string()),
466                container_port: 8080,
467                ..Default::default()
468            },
469            k8s_openapi::api::core::v1::ContainerPort {
470                name: Some("metrics".to_string()),
471                container_port: 9090,
472                ..Default::default()
473            },
474        ]),
475        ..Default::default()
476    };
477
478    // Build volumes
479    let volumes = vec![
480        k8s_openapi::api::core::v1::Volume {
481            name: "config".to_string(),
482            config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
483                name: format!("rivven-connect-{}", name),
484                ..Default::default()
485            }),
486            ..Default::default()
487        },
488        k8s_openapi::api::core::v1::Volume {
489            name: "data".to_string(),
490            empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
491            ..Default::default()
492        },
493    ];
494
495    // Build image pull secrets
496    let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
497        None
498    } else {
499        Some(
500            spec.image_pull_secrets
501                .iter()
502                .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
503                .collect(),
504        )
505    };
506
507    let pod_spec = k8s_openapi::api::core::v1::PodSpec {
508        containers: vec![container],
509        volumes: Some(volumes),
510        image_pull_secrets,
511        service_account_name: spec.service_account.clone(),
512        node_selector: if spec.node_selector.is_empty() {
513            None
514        } else {
515            Some(spec.node_selector.clone())
516        },
517        tolerations: if spec.tolerations.is_empty() {
518            None
519        } else {
520            Some(spec.tolerations.clone())
521        },
522        affinity: spec
523            .affinity
524            .as_ref()
525            .and_then(|a| serde_json::from_value(a.clone()).ok()),
526        security_context: spec
527            .security_context
528            .as_ref()
529            .and_then(|sc| serde_json::from_value(sc.clone()).ok()),
530        ..Default::default()
531    };
532
533    Ok(Deployment {
534        metadata: ObjectMeta {
535            name: Some(format!("rivven-connect-{}", name)),
536            namespace: Some(namespace),
537            labels: Some(labels.clone()),
538            annotations: Some(spec.pod_annotations.clone()),
539            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
540            ..Default::default()
541        },
542        spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
543            replicas: Some(spec.replicas),
544            selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
545                match_labels: Some(labels.clone()),
546                ..Default::default()
547            },
548            template: k8s_openapi::api::core::v1::PodTemplateSpec {
549                metadata: Some(ObjectMeta {
550                    labels: Some(labels),
551                    annotations: Some(spec.pod_annotations.clone()),
552                    ..Default::default()
553                }),
554                spec: Some(pod_spec),
555            },
556            ..Default::default()
557        }),
558        ..Default::default()
559    })
560}
561
562/// Build the Service for connect workers
563fn build_connect_service(connect: &RivvenConnect) -> Service {
564    let name = connect.name_any();
565    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
566
567    let mut labels = BTreeMap::new();
568    labels.insert(
569        "app.kubernetes.io/name".to_string(),
570        "rivven-connect".to_string(),
571    );
572    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
573    labels.insert(
574        "app.kubernetes.io/component".to_string(),
575        "connect".to_string(),
576    );
577    labels.insert(
578        "app.kubernetes.io/managed-by".to_string(),
579        "rivven-operator".to_string(),
580    );
581
582    Service {
583        metadata: ObjectMeta {
584            name: Some(format!("rivven-connect-{}", name)),
585            namespace: Some(namespace),
586            labels: Some(labels.clone()),
587            owner_references: Some(vec![connect.controller_owner_ref(&()).unwrap()]),
588            ..Default::default()
589        },
590        spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
591            selector: Some(labels),
592            ports: Some(vec![
593                k8s_openapi::api::core::v1::ServicePort {
594                    name: Some("http".to_string()),
595                    port: 8080,
596                    target_port: Some(
597                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
598                    ),
599                    ..Default::default()
600                },
601                k8s_openapi::api::core::v1::ServicePort {
602                    name: Some("metrics".to_string()),
603                    port: 9090,
604                    target_port: Some(
605                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
606                    ),
607                    ..Default::default()
608                },
609            ]),
610            ..Default::default()
611        }),
612        ..Default::default()
613    }
614}
615
616/// Apply ConfigMap using server-side apply
617async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
618    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
619    let name = cm.metadata.name.as_ref().unwrap();
620
621    debug!(name = %name, "Applying Connect ConfigMap");
622
623    let patch_params = PatchParams::apply("rivven-operator").force();
624    api.patch(name, &patch_params, &Patch::Apply(&cm))
625        .await
626        .map_err(OperatorError::from)?;
627
628    Ok(())
629}
630
631/// Apply Deployment using server-side apply
632async fn apply_connect_deployment(
633    client: &Client,
634    namespace: &str,
635    deployment: Deployment,
636) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
637    let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
638    let name = deployment.metadata.name.as_ref().unwrap();
639
640    debug!(name = %name, "Applying Connect Deployment");
641
642    let patch_params = PatchParams::apply("rivven-operator").force();
643    let result = api
644        .patch(name, &patch_params, &Patch::Apply(&deployment))
645        .await
646        .map_err(OperatorError::from)?;
647
648    Ok(result.status)
649}
650
651/// Apply Service using server-side apply
652async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
653    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
654    let name = svc.metadata.name.as_ref().unwrap();
655
656    debug!(name = %name, "Applying Connect Service");
657
658    let patch_params = PatchParams::apply("rivven-operator").force();
659    api.patch(name, &patch_params, &Patch::Apply(&svc))
660        .await
661        .map_err(OperatorError::from)?;
662
663    Ok(())
664}
665
666/// Build connect status from deployment status
667fn build_connect_status(
668    connect: &RivvenConnect,
669    deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
670) -> RivvenConnectStatus {
671    let now = Utc::now().to_rfc3339();
672    let spec = &connect.spec;
673
674    let (replicas, ready_replicas, updated_replicas) = deploy_status
675        .map(|s| {
676            (
677                s.replicas.unwrap_or(0),
678                s.ready_replicas.unwrap_or(0),
679                s.updated_replicas.unwrap_or(0),
680            )
681        })
682        .unwrap_or((0, 0, 0));
683
684    let desired_replicas = spec.replicas;
685
686    // Determine phase
687    let phase = if ready_replicas == 0 {
688        ConnectPhase::Pending
689    } else if ready_replicas < desired_replicas {
690        if updated_replicas < desired_replicas {
691            ConnectPhase::Starting
692        } else {
693            ConnectPhase::Degraded
694        }
695    } else {
696        ConnectPhase::Running
697    };
698
699    // Count configured connectors
700    let sources_total = spec.sources.len() as i32;
701    let sinks_total = spec.sinks.len() as i32;
702
703    // Estimate running connectors (simplified - in production, query the connect workers)
704    let sources_running = if phase == ConnectPhase::Running {
705        spec.sources.iter().filter(|s| s.enabled).count() as i32
706    } else {
707        0
708    };
709    let sinks_running = if phase == ConnectPhase::Running {
710        spec.sinks.iter().filter(|s| s.enabled).count() as i32
711    } else {
712        0
713    };
714
715    // Build conditions
716    let mut conditions = vec![];
717
718    conditions.push(ConnectCondition {
719        condition_type: "Ready".to_string(),
720        status: if ready_replicas >= desired_replicas {
721            "True".to_string()
722        } else {
723            "False".to_string()
724        },
725        reason: Some(format!(
726            "{}/{} replicas ready",
727            ready_replicas, desired_replicas
728        )),
729        message: None,
730        last_transition_time: Some(now.clone()),
731    });
732
733    conditions.push(ConnectCondition {
734        condition_type: "BrokerConnected".to_string(),
735        status: if phase == ConnectPhase::Running {
736            "True".to_string()
737        } else {
738            "Unknown".to_string()
739        },
740        reason: Some("ClusterRefValid".to_string()),
741        message: None,
742        last_transition_time: Some(now.clone()),
743    });
744
745    conditions.push(ConnectCondition {
746        condition_type: "SourcesHealthy".to_string(),
747        status: if sources_running == sources_total && sources_total > 0 {
748            "True".to_string()
749        } else if sources_running > 0 {
750            "Partial".to_string()
751        } else if sources_total == 0 {
752            "N/A".to_string()
753        } else {
754            "False".to_string()
755        },
756        reason: Some(format!(
757            "{}/{} sources running",
758            sources_running, sources_total
759        )),
760        message: None,
761        last_transition_time: Some(now.clone()),
762    });
763
764    conditions.push(ConnectCondition {
765        condition_type: "SinksHealthy".to_string(),
766        status: if sinks_running == sinks_total && sinks_total > 0 {
767            "True".to_string()
768        } else if sinks_running > 0 {
769            "Partial".to_string()
770        } else if sinks_total == 0 {
771            "N/A".to_string()
772        } else {
773            "False".to_string()
774        },
775        reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
776        message: None,
777        last_transition_time: Some(now.clone()),
778    });
779
780    // Build connector statuses
781    let mut connector_statuses = Vec::new();
782
783    for source in &spec.sources {
784        connector_statuses.push(ConnectorStatus {
785            name: source.name.clone(),
786            connector_type: "source".to_string(),
787            kind: source.connector.clone(),
788            state: if source.enabled && phase == ConnectPhase::Running {
789                "running".to_string()
790            } else if !source.enabled {
791                "disabled".to_string()
792            } else {
793                "pending".to_string()
794            },
795            events_processed: 0, // Would need to query metrics
796            last_error: None,
797            last_success_time: None,
798        });
799    }
800
801    for sink in &spec.sinks {
802        connector_statuses.push(ConnectorStatus {
803            name: sink.name.clone(),
804            connector_type: "sink".to_string(),
805            kind: sink.connector.clone(),
806            state: if sink.enabled && phase == ConnectPhase::Running {
807                "running".to_string()
808            } else if !sink.enabled {
809                "disabled".to_string()
810            } else {
811                "pending".to_string()
812            },
813            events_processed: 0,
814            last_error: None,
815            last_success_time: None,
816        });
817    }
818
819    RivvenConnectStatus {
820        phase,
821        replicas,
822        ready_replicas,
823        sources_running,
824        sinks_running,
825        sources_total,
826        sinks_total,
827        observed_generation: connect.metadata.generation.unwrap_or(0),
828        conditions,
829        connector_statuses,
830        last_updated: Some(now),
831        message: None,
832    }
833}
834
835/// Build a failed status
836fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
837    let now = Utc::now().to_rfc3339();
838
839    RivvenConnectStatus {
840        phase: ConnectPhase::Failed,
841        replicas: 0,
842        ready_replicas: 0,
843        sources_running: 0,
844        sinks_running: 0,
845        sources_total: connect.spec.sources.len() as i32,
846        sinks_total: connect.spec.sinks.len() as i32,
847        observed_generation: connect.metadata.generation.unwrap_or(0),
848        conditions: vec![ConnectCondition {
849            condition_type: "Ready".to_string(),
850            status: "False".to_string(),
851            reason: Some("ValidationFailed".to_string()),
852            message: Some(error_msg.to_string()),
853            last_transition_time: Some(now.clone()),
854        }],
855        connector_statuses: vec![],
856        last_updated: Some(now),
857        message: Some(error_msg.to_string()),
858    }
859}
860
861/// Update the connect status subresource
862async fn update_connect_status(
863    client: &Client,
864    namespace: &str,
865    name: &str,
866    status: RivvenConnectStatus,
867) -> Result<()> {
868    let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
869
870    debug!(name = %name, phase = ?status.phase, "Updating connect status");
871
872    let patch = serde_json::json!({
873        "status": status
874    });
875
876    let patch_params = PatchParams::default();
877    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
878        .await
879        .map_err(OperatorError::from)?;
880
881    Ok(())
882}
883
884/// Cleanup resources when connect is deleted
885#[instrument(skip(connect, _ctx))]
886async fn cleanup_connect(
887    connect: Arc<RivvenConnect>,
888    _ctx: Arc<ConnectControllerContext>,
889) -> Result<Action> {
890    let name = connect.name_any();
891    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
892
893    info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
894
895    // Resources with owner references will be garbage collected automatically
896    // Additional cleanup could include:
897    // 1. Graceful connector shutdown
898    // 2. Offset commit for sinks
899    // 3. External resource cleanup
900
901    info!(name = %name, "Connect cleanup complete");
902
903    Ok(Action::await_change())
904}
905
906/// Error policy for the connect controller
907fn connect_error_policy(
908    _connect: Arc<RivvenConnect>,
909    error: &OperatorError,
910    _ctx: Arc<ConnectControllerContext>,
911) -> Action {
912    warn!(
913        error = %error,
914        "Connect reconciliation error, will retry"
915    );
916
917    let delay = error
918        .requeue_delay()
919        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
920
921    Action::requeue(delay)
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927    use crate::crd::{
928        ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
929        SourceConnectorSpec, SourceTopicConfigSpec,
930    };
931
932    fn create_test_connect() -> RivvenConnect {
933        RivvenConnect {
934            metadata: ObjectMeta {
935                name: Some("test-connect".to_string()),
936                namespace: Some("default".to_string()),
937                uid: Some("test-uid".to_string()),
938                generation: Some(1),
939                ..Default::default()
940            },
941            spec: RivvenConnectSpec {
942                cluster_ref: ClusterReference {
943                    name: "test-cluster".to_string(),
944                    namespace: None,
945                },
946                replicas: 2,
947                version: "0.0.1".to_string(),
948                image: None,
949                image_pull_policy: "IfNotPresent".to_string(),
950                image_pull_secrets: vec![],
951                resources: None,
952                config: ConnectConfigSpec::default(),
953                sources: vec![SourceConnectorSpec {
954                    name: "test-source".to_string(),
955                    connector: "datagen".to_string(),
956                    topic: "test-topic".to_string(),
957                    topic_routing: None,
958                    enabled: true,
959                    postgres_cdc: None,
960                    mysql_cdc: None,
961                    http: None,
962                    datagen: None,
963                    kafka: None,
964                    mqtt: None,
965                    sqs: None,
966                    pubsub: None,
967                    config: serde_json::Value::Null,
968                    config_secret_ref: None,
969                    topic_config: SourceTopicConfigSpec::default(),
970                }],
971                sinks: vec![SinkConnectorSpec {
972                    name: "test-sink".to_string(),
973                    connector: "stdout".to_string(),
974                    topics: vec!["test-topic".to_string()],
975                    consumer_group: "test-group".to_string(),
976                    enabled: true,
977                    start_offset: "latest".to_string(),
978                    s3: None,
979                    http: None,
980                    stdout: None,
981                    kafka: None,
982                    gcs: None,
983                    azure_blob: None,
984                    snowflake: None,
985                    bigquery: None,
986                    redshift: None,
987                    config: serde_json::Value::Null,
988                    config_secret_ref: None,
989                    rate_limit: RateLimitSpec::default(),
990                }],
991                settings: GlobalConnectSettings::default(),
992                tls: ConnectTlsSpec::default(),
993                pod_annotations: BTreeMap::new(),
994                pod_labels: BTreeMap::new(),
995                env: vec![],
996                node_selector: BTreeMap::new(),
997                tolerations: vec![],
998                affinity: None,
999                service_account: None,
1000                security_context: None,
1001                container_security_context: None,
1002            },
1003            status: None,
1004        }
1005    }
1006
1007    #[test]
1008    fn test_build_connect_status_pending() {
1009        let connect = create_test_connect();
1010        let status = build_connect_status(&connect, None);
1011
1012        assert_eq!(status.phase, ConnectPhase::Pending);
1013        assert_eq!(status.replicas, 0);
1014        assert_eq!(status.ready_replicas, 0);
1015        assert_eq!(status.sources_total, 1);
1016        assert_eq!(status.sinks_total, 1);
1017    }
1018
1019    #[test]
1020    fn test_build_connect_status_running() {
1021        let connect = create_test_connect();
1022        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1023            replicas: Some(2),
1024            ready_replicas: Some(2),
1025            updated_replicas: Some(2),
1026            ..Default::default()
1027        };
1028
1029        let status = build_connect_status(&connect, Some(deploy_status));
1030
1031        assert_eq!(status.phase, ConnectPhase::Running);
1032        assert_eq!(status.replicas, 2);
1033        assert_eq!(status.ready_replicas, 2);
1034        assert_eq!(status.sources_running, 1);
1035        assert_eq!(status.sinks_running, 1);
1036    }
1037
1038    #[test]
1039    fn test_build_connect_status_degraded() {
1040        let connect = create_test_connect();
1041        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1042            replicas: Some(2),
1043            ready_replicas: Some(1),
1044            updated_replicas: Some(2),
1045            ..Default::default()
1046        };
1047
1048        let status = build_connect_status(&connect, Some(deploy_status));
1049
1050        assert_eq!(status.phase, ConnectPhase::Degraded);
1051    }
1052
1053    #[test]
1054    fn test_build_failed_status() {
1055        let connect = create_test_connect();
1056        let status = build_failed_status(&connect, "Test error");
1057
1058        assert_eq!(status.phase, ConnectPhase::Failed);
1059        assert_eq!(status.message, Some("Test error".to_string()));
1060        assert_eq!(status.conditions.len(), 1);
1061        assert_eq!(status.conditions[0].status, "False");
1062    }
1063
1064    #[test]
1065    fn test_build_pipeline_yaml() {
1066        let connect = create_test_connect();
1067        let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1068
1069        assert!(yaml.contains("version: \"1.0\""));
1070        assert!(yaml.contains("bootstrap_servers:"));
1071        assert!(yaml.contains("rivven-test-cluster-client"));
1072        assert!(yaml.contains("sources:"));
1073        assert!(yaml.contains("test-source:"));
1074        assert!(yaml.contains("sinks:"));
1075        assert!(yaml.contains("test-sink:"));
1076    }
1077
1078    #[test]
1079    fn test_build_connect_configmap() {
1080        let connect = create_test_connect();
1081        let cm = build_connect_configmap(&connect).unwrap();
1082
1083        assert_eq!(
1084            cm.metadata.name,
1085            Some("rivven-connect-test-connect".to_string())
1086        );
1087        assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1088    }
1089
1090    #[test]
1091    fn test_build_connect_deployment() {
1092        let connect = create_test_connect();
1093        let deployment = build_connect_deployment(&connect).unwrap();
1094
1095        assert_eq!(
1096            deployment.metadata.name,
1097            Some("rivven-connect-test-connect".to_string())
1098        );
1099        assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1100    }
1101
1102    #[test]
1103    fn test_build_connect_service() {
1104        let connect = create_test_connect();
1105        let service = build_connect_service(&connect);
1106
1107        assert_eq!(
1108            service.metadata.name,
1109            Some("rivven-connect-test-connect".to_string())
1110        );
1111        let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1112        assert_eq!(ports.len(), 2);
1113    }
1114
1115    #[test]
1116    fn test_connector_statuses() {
1117        let connect = create_test_connect();
1118        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1119            replicas: Some(2),
1120            ready_replicas: Some(2),
1121            updated_replicas: Some(2),
1122            ..Default::default()
1123        };
1124
1125        let status = build_connect_status(&connect, Some(deploy_status));
1126
1127        assert_eq!(status.connector_statuses.len(), 2);
1128
1129        let source_status = status
1130            .connector_statuses
1131            .iter()
1132            .find(|s| s.name == "test-source")
1133            .unwrap();
1134        assert_eq!(source_status.connector_type, "source");
1135        assert_eq!(source_status.state, "running");
1136
1137        let sink_status = status
1138            .connector_statuses
1139            .iter()
1140            .find(|s| s.name == "test-sink")
1141            .unwrap();
1142        assert_eq!(sink_status.connector_type, "sink");
1143        assert_eq!(sink_status.state, "running");
1144    }
1145
1146    #[test]
1147    fn test_conditions() {
1148        let connect = create_test_connect();
1149        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1150            replicas: Some(2),
1151            ready_replicas: Some(2),
1152            updated_replicas: Some(2),
1153            ..Default::default()
1154        };
1155
1156        let status = build_connect_status(&connect, Some(deploy_status));
1157
1158        assert_eq!(status.conditions.len(), 4);
1159
1160        let ready_cond = status
1161            .conditions
1162            .iter()
1163            .find(|c| c.condition_type == "Ready")
1164            .unwrap();
1165        assert_eq!(ready_cond.status, "True");
1166
1167        let broker_cond = status
1168            .conditions
1169            .iter()
1170            .find(|c| c.condition_type == "BrokerConnected")
1171            .unwrap();
1172        assert_eq!(broker_cond.status, "True");
1173    }
1174}