Skip to main content

rivven_operator/
connect_controller.rs

1//! RivvenConnect Controller
2//!
3//! This module implements the Kubernetes controller for managing RivvenConnect
4//! custom resources. It watches for changes and reconciles connector pipelines.
5
6use crate::crd::{
7    ClusterReference, ConnectCondition, ConnectPhase, ConnectorStatus, RivvenConnect,
8    RivvenConnectSpec, RivvenConnectStatus,
9};
10use crate::error::{OperatorError, Result};
11use chrono::Utc;
12use futures::StreamExt;
13use k8s_openapi::api::apps::v1::Deployment;
14use k8s_openapi::api::core::v1::{ConfigMap, Service};
15use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
16use kube::api::{Api, Patch, PatchParams};
17use kube::runtime::controller::{Action, Controller};
18use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
19use kube::runtime::watcher::Config;
20use kube::{Client, Resource, ResourceExt};
21use std::collections::BTreeMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tracing::{debug, error, info, instrument, warn};
25use validator::Validate;
26
27/// Finalizer name for cleanup operations
28pub const CONNECT_FINALIZER: &str = "rivven.hupe1980.github.io/connect-finalizer";
29
30/// Default requeue interval for successful reconciliations
31const DEFAULT_REQUEUE_SECONDS: u64 = 60; // 1 minute
32
33/// Requeue interval for error cases
34const ERROR_REQUEUE_SECONDS: u64 = 30;
35
36/// Context passed to the connect controller
37pub struct ConnectControllerContext {
38    /// Kubernetes client
39    pub client: Client,
40    /// Metrics recorder
41    pub metrics: Option<ConnectControllerMetrics>,
42}
43
44/// Metrics for the connect controller
45#[derive(Clone)]
46pub struct ConnectControllerMetrics {
47    /// Counter for reconciliation attempts
48    pub reconciliations: metrics::Counter,
49    /// Counter for reconciliation errors
50    pub errors: metrics::Counter,
51    /// Histogram for reconciliation duration
52    pub duration: metrics::Histogram,
53}
54
55impl ConnectControllerMetrics {
56    /// Create new connect controller metrics
57    pub fn new() -> Self {
58        Self {
59            reconciliations: metrics::counter!("rivven_connect_reconciliations_total"),
60            errors: metrics::counter!("rivven_connect_reconciliation_errors_total"),
61            duration: metrics::histogram!("rivven_connect_reconciliation_duration_seconds"),
62        }
63    }
64}
65
66impl Default for ConnectControllerMetrics {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72/// Start the RivvenConnect controller
73pub async fn run_connect_controller(client: Client, namespace: Option<String>) -> Result<()> {
74    let connects: Api<RivvenConnect> = match &namespace {
75        Some(ns) => Api::namespaced(client.clone(), ns),
76        None => Api::all(client.clone()),
77    };
78
79    let ctx = Arc::new(ConnectControllerContext {
80        client: client.clone(),
81        metrics: Some(ConnectControllerMetrics::new()),
82    });
83
84    info!(
85        namespace = namespace.as_deref().unwrap_or("all"),
86        "Starting RivvenConnect controller"
87    );
88
89    // Watch related resources for changes
90    let deployments = match &namespace {
91        Some(ns) => Api::<Deployment>::namespaced(client.clone(), ns),
92        None => Api::<Deployment>::all(client.clone()),
93    };
94
95    let configmaps = match &namespace {
96        Some(ns) => Api::<ConfigMap>::namespaced(client.clone(), ns),
97        None => Api::<ConfigMap>::all(client.clone()),
98    };
99
100    Controller::new(connects.clone(), Config::default())
101        .owns(deployments, Config::default())
102        .owns(configmaps, Config::default())
103        .run(reconcile_connect, connect_error_policy, ctx)
104        .for_each(|result| async move {
105            match result {
106                Ok((obj, action)) => {
107                    debug!(
108                        name = obj.name,
109                        namespace = obj.namespace,
110                        ?action,
111                        "Connect reconciliation completed"
112                    );
113                }
114                Err(e) => {
115                    error!(error = %e, "Connect reconciliation failed");
116                }
117            }
118        })
119        .await;
120
121    Ok(())
122}
123
124/// Main reconciliation function for RivvenConnect
125#[instrument(skip(connect, ctx), fields(name = %connect.name_any(), namespace = connect.namespace()))]
126async fn reconcile_connect(
127    connect: Arc<RivvenConnect>,
128    ctx: Arc<ConnectControllerContext>,
129) -> Result<Action> {
130    let start = std::time::Instant::now();
131
132    if let Some(ref metrics) = ctx.metrics {
133        metrics.reconciliations.increment(1);
134    }
135
136    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
137    let connects: Api<RivvenConnect> = Api::namespaced(ctx.client.clone(), &namespace);
138
139    let result = finalizer(&connects, CONNECT_FINALIZER, connect, |event| async {
140        match event {
141            FinalizerEvent::Apply(connect) => apply_connect(connect, ctx.clone()).await,
142            FinalizerEvent::Cleanup(connect) => cleanup_connect(connect, ctx.clone()).await,
143        }
144    })
145    .await;
146
147    if let Some(ref metrics) = ctx.metrics {
148        metrics.duration.record(start.elapsed().as_secs_f64());
149    }
150
151    result.map_err(|e| {
152        if let Some(ref metrics) = ctx.metrics {
153            metrics.errors.increment(1);
154        }
155        OperatorError::ReconcileFailed(e.to_string())
156    })
157}
158
159/// Apply (create/update) the connect resources
160#[instrument(skip(connect, ctx))]
161async fn apply_connect(
162    connect: Arc<RivvenConnect>,
163    ctx: Arc<ConnectControllerContext>,
164) -> Result<Action> {
165    let name = connect.name_any();
166    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
167
168    info!(name = %name, namespace = %namespace, "Reconciling RivvenConnect");
169
170    // Validate the connect spec
171    if let Err(errors) = connect.spec.validate() {
172        let error_messages: Vec<String> = errors
173            .field_errors()
174            .iter()
175            .flat_map(|(field, errs)| {
176                errs.iter()
177                    .map(move |e| format!("{}: {:?}", field, e.message))
178            })
179            .collect();
180        let error_msg = error_messages.join("; ");
181        warn!(name = %name, errors = %error_msg, "Connect spec validation failed");
182
183        // Update status to failed
184        update_connect_status(
185            &ctx.client,
186            &namespace,
187            &name,
188            build_failed_status(&connect, &error_msg),
189        )
190        .await?;
191
192        return Err(OperatorError::InvalidConfig(error_msg));
193    }
194
195    // Verify cluster reference exists
196    verify_cluster_ref(&ctx.client, &namespace, &connect.spec.cluster_ref).await?;
197
198    // Build and apply ConfigMap with pipeline configuration
199    let configmap = build_connect_configmap(&connect)?;
200    apply_connect_configmap(&ctx.client, &namespace, configmap).await?;
201
202    // Build and apply Deployment
203    let deployment = build_connect_deployment(&connect)?;
204    let deploy_status = apply_connect_deployment(&ctx.client, &namespace, deployment).await?;
205
206    // Build and apply headless Service for connect workers
207    let service = build_connect_service(&connect)?;
208    apply_connect_service(&ctx.client, &namespace, service).await?;
209
210    // Update connect status
211    let status = build_connect_status(&connect, deploy_status);
212    update_connect_status(&ctx.client, &namespace, &name, status).await?;
213
214    info!(name = %name, "Connect reconciliation complete");
215
216    Ok(Action::requeue(Duration::from_secs(
217        DEFAULT_REQUEUE_SECONDS,
218    )))
219}
220
221/// Verify the referenced RivvenCluster exists
222async fn verify_cluster_ref(
223    client: &Client,
224    namespace: &str,
225    cluster_ref: &ClusterReference,
226) -> Result<()> {
227    let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228    let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230    match clusters.get(&cluster_ref.name).await {
231        Ok(_) => Ok(()),
232        Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233            format!("{}/{}", cluster_ns, cluster_ref.name),
234        )),
235        Err(e) => Err(OperatorError::from(e)),
236    }
237}
238
239/// Build ConfigMap containing the connect pipeline configuration
240fn build_connect_configmap(connect: &RivvenConnect) -> Result<ConfigMap> {
241    let name = connect.name_any();
242    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
243
244    // Build pipeline YAML configuration
245    let pipeline_config = build_pipeline_yaml(&connect.spec)?;
246
247    let mut labels = BTreeMap::new();
248    labels.insert(
249        "app.kubernetes.io/name".to_string(),
250        "rivven-connect".to_string(),
251    );
252    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
253    labels.insert(
254        "app.kubernetes.io/component".to_string(),
255        "connect".to_string(),
256    );
257    labels.insert(
258        "app.kubernetes.io/managed-by".to_string(),
259        "rivven-operator".to_string(),
260    );
261
262    let mut data = BTreeMap::new();
263    data.insert("pipeline.yaml".to_string(), pipeline_config);
264
265    Ok(ConfigMap {
266        metadata: ObjectMeta {
267            name: Some(format!("rivven-connect-{}", name)),
268            namespace: Some(namespace),
269            labels: Some(labels),
270            owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
271                || {
272                    OperatorError::InvalidConfig(
273                        "Failed to generate owner reference for ConfigMap".into(),
274                    )
275                },
276            )?]),
277            ..Default::default()
278        },
279        data: Some(data),
280        ..Default::default()
281    })
282}
283
284/// Build the pipeline YAML from the spec
285///
286/// Constructs the pipeline configuration as a `serde_json::Value` tree and
287/// serializes it with `serde_yaml` instead of manual string interpolation.
288/// This guarantees valid YAML output regardless of user-provided values
289/// (special characters, newlines, YAML metacharacters, etc.).
290fn build_pipeline_yaml(spec: &RivvenConnectSpec) -> Result<String> {
291    let mut pipeline = serde_json::Map::new();
292    pipeline.insert("version".to_string(), serde_json::json!("1.0"));
293
294    // Broker configuration
295    let cluster_ns = spec.cluster_ref.namespace.as_deref().unwrap_or("default");
296    let cluster_svc = format!(
297        "rivven-{}.{}.svc.cluster.local:9092",
298        spec.cluster_ref.name, cluster_ns
299    );
300    pipeline.insert(
301        "broker".to_string(),
302        serde_json::json!({ "bootstrap_servers": [cluster_svc] }),
303    );
304
305    // Global settings
306    if !spec.sources.is_empty() || !spec.sinks.is_empty() {
307        pipeline.insert(
308            "settings".to_string(),
309            serde_json::json!({
310                "topic": {
311                    "auto_create": spec.settings.topic.auto_create,
312                    "default_partitions": spec.settings.topic.default_partitions
313                }
314            }),
315        );
316    }
317
318    // Sources
319    if !spec.sources.is_empty() {
320        let mut sources = serde_json::Map::new();
321        for source in &spec.sources {
322            let mut entry = serde_json::Map::new();
323            entry.insert("connector".to_string(), serde_json::json!(source.connector));
324            entry.insert("topic".to_string(), serde_json::json!(source.topic));
325            entry.insert("enabled".to_string(), serde_json::json!(source.enabled));
326
327            // Merge topic_routing into config for CDC connectors
328            let mut config = source.config.clone();
329            if let Some(ref routing) = source.topic_routing {
330                if let serde_json::Value::Object(ref mut map) = config {
331                    map.insert("topic_routing".to_string(), serde_json::json!(routing));
332                } else if config.is_null() {
333                    config = serde_json::json!({"topic_routing": routing});
334                }
335            }
336
337            if !config.is_null() {
338                entry.insert("config".to_string(), config);
339            }
340
341            sources.insert(source.name.clone(), serde_json::Value::Object(entry));
342        }
343        pipeline.insert("sources".to_string(), serde_json::Value::Object(sources));
344    }
345
346    // Sinks
347    if !spec.sinks.is_empty() {
348        let mut sinks = serde_json::Map::new();
349        for sink in &spec.sinks {
350            let mut entry = serde_json::Map::new();
351            entry.insert("connector".to_string(), serde_json::json!(sink.connector));
352            entry.insert("topics".to_string(), serde_json::json!(sink.topics));
353            entry.insert(
354                "consumer_group".to_string(),
355                serde_json::json!(sink.consumer_group),
356            );
357            entry.insert("enabled".to_string(), serde_json::json!(sink.enabled));
358
359            if !sink.config.is_null() {
360                entry.insert("config".to_string(), sink.config.clone());
361            }
362
363            sinks.insert(sink.name.clone(), serde_json::Value::Object(entry));
364        }
365        pipeline.insert("sinks".to_string(), serde_json::Value::Object(sinks));
366    }
367
368    serde_yaml::to_string(&serde_json::Value::Object(pipeline)).map_err(|e| {
369        OperatorError::InvalidConfig(format!("failed to serialize pipeline YAML: {e}"))
370    })
371}
372
373/// Build the Deployment for connect workers
374fn build_connect_deployment(connect: &RivvenConnect) -> Result<Deployment> {
375    let name = connect.name_any();
376    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
377    let spec = &connect.spec;
378
379    // Immutable selector labels (must not include user pod_labels)
380    let selector_labels: BTreeMap<String, String> = [
381        (
382            "app.kubernetes.io/name".to_string(),
383            "rivven-connect".to_string(),
384        ),
385        ("app.kubernetes.io/instance".to_string(), name.clone()),
386    ]
387    .into();
388
389    let mut labels = BTreeMap::new();
390    labels.insert(
391        "app.kubernetes.io/name".to_string(),
392        "rivven-connect".to_string(),
393    );
394    labels.insert("app.kubernetes.io/instance".to_string(), name.clone());
395    labels.insert(
396        "app.kubernetes.io/component".to_string(),
397        "connect".to_string(),
398    );
399    labels.insert(
400        "app.kubernetes.io/managed-by".to_string(),
401        "rivven-operator".to_string(),
402    );
403
404    // Merge user labels (only into template labels, not selector)
405    for (k, v) in &spec.pod_labels {
406        if !k.starts_with("app.kubernetes.io/") {
407            labels.insert(k.clone(), v.clone());
408        }
409    }
410
411    let image = spec
412        .image
413        .clone()
414        .unwrap_or_else(|| format!("ghcr.io/hupe1980/rivven-connect:{}", spec.version));
415
416    // Build container
417    let container = k8s_openapi::api::core::v1::Container {
418        name: "connect".to_string(),
419        image: Some(image),
420        image_pull_policy: Some(spec.image_pull_policy.clone()),
421        args: Some(vec![
422            "run".to_string(),
423            "--config".to_string(),
424            "/config/pipeline.yaml".to_string(),
425        ]),
426        env: Some(spec.env.clone()),
427        volume_mounts: Some(vec![
428            k8s_openapi::api::core::v1::VolumeMount {
429                name: "config".to_string(),
430                mount_path: "/config".to_string(),
431                read_only: Some(true),
432                ..Default::default()
433            },
434            k8s_openapi::api::core::v1::VolumeMount {
435                name: "data".to_string(),
436                mount_path: "/data".to_string(),
437                ..Default::default()
438            },
439        ]),
440        resources: spec
441            .resources
442            .as_ref()
443            .map(|r| serde_json::from_value(r.clone()))
444            .transpose()
445            .map_err(|e| {
446                OperatorError::InvalidConfig(format!("invalid resources config: {}", e))
447            })?,
448        security_context: {
449            let custom = spec
450                .container_security_context
451                .as_ref()
452                .map(|sc| serde_json::from_value(sc.clone()))
453                .transpose()
454                .map_err(|e| {
455                    OperatorError::InvalidConfig(format!(
456                        "invalid container security context: {}",
457                        e
458                    ))
459                })?;
460            Some(
461                custom.unwrap_or_else(|| k8s_openapi::api::core::v1::SecurityContext {
462                    allow_privilege_escalation: Some(false),
463                    read_only_root_filesystem: Some(true),
464                    run_as_non_root: Some(true),
465                    run_as_user: Some(1000),
466                    run_as_group: Some(1000),
467                    capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
468                        drop: Some(vec!["ALL".to_string()]),
469                        ..Default::default()
470                    }),
471                    seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
472                        type_: "RuntimeDefault".to_string(),
473                        ..Default::default()
474                    }),
475                    ..Default::default()
476                }),
477            )
478        },
479        liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
480            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
481                path: Some("/health".to_string()),
482                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
483                ..Default::default()
484            }),
485            initial_delay_seconds: Some(30),
486            period_seconds: Some(10),
487            ..Default::default()
488        }),
489        readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
490            http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
491                path: Some("/ready".to_string()),
492                port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
493                ..Default::default()
494            }),
495            initial_delay_seconds: Some(10),
496            period_seconds: Some(5),
497            ..Default::default()
498        }),
499        ports: Some(vec![
500            k8s_openapi::api::core::v1::ContainerPort {
501                name: Some("http".to_string()),
502                container_port: 8080,
503                ..Default::default()
504            },
505            k8s_openapi::api::core::v1::ContainerPort {
506                name: Some("metrics".to_string()),
507                container_port: 9090,
508                ..Default::default()
509            },
510        ]),
511        ..Default::default()
512    };
513
514    // Build volumes
515    let volumes = vec![
516        k8s_openapi::api::core::v1::Volume {
517            name: "config".to_string(),
518            config_map: Some(k8s_openapi::api::core::v1::ConfigMapVolumeSource {
519                name: format!("rivven-connect-{}", name),
520                ..Default::default()
521            }),
522            ..Default::default()
523        },
524        k8s_openapi::api::core::v1::Volume {
525            name: "data".to_string(),
526            empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource::default()),
527            ..Default::default()
528        },
529    ];
530
531    // Build image pull secrets
532    let image_pull_secrets: Option<Vec<_>> = if spec.image_pull_secrets.is_empty() {
533        None
534    } else {
535        Some(
536            spec.image_pull_secrets
537                .iter()
538                .map(|s| k8s_openapi::api::core::v1::LocalObjectReference { name: s.clone() })
539                .collect(),
540        )
541    };
542
543    let pod_spec = k8s_openapi::api::core::v1::PodSpec {
544        containers: vec![container],
545        volumes: Some(volumes),
546        image_pull_secrets,
547        service_account_name: spec.service_account.clone(),
548        node_selector: if spec.node_selector.is_empty() {
549            None
550        } else {
551            Some(spec.node_selector.clone())
552        },
553        tolerations: if spec.tolerations.is_empty() {
554            None
555        } else {
556            Some(spec.tolerations.clone())
557        },
558        affinity: spec
559            .affinity
560            .as_ref()
561            .map(|a| serde_json::from_value(a.clone()))
562            .transpose()
563            .map_err(|e| OperatorError::InvalidConfig(format!("invalid affinity config: {}", e)))?,
564        security_context: {
565            let custom = spec
566                .security_context
567                .as_ref()
568                .map(|sc| serde_json::from_value(sc.clone()))
569                .transpose()
570                .map_err(|e| {
571                    OperatorError::InvalidConfig(format!("invalid pod security context: {}", e))
572                })?;
573            Some(
574                custom.unwrap_or_else(|| k8s_openapi::api::core::v1::PodSecurityContext {
575                    run_as_non_root: Some(true),
576                    run_as_user: Some(1000),
577                    run_as_group: Some(1000),
578                    fs_group: Some(1000),
579                    seccomp_profile: Some(k8s_openapi::api::core::v1::SeccompProfile {
580                        type_: "RuntimeDefault".to_string(),
581                        ..Default::default()
582                    }),
583                    ..Default::default()
584                }),
585            )
586        },
587        automount_service_account_token: Some(false),
588        ..Default::default()
589    };
590
591    Ok(Deployment {
592        metadata: ObjectMeta {
593            name: Some(format!("rivven-connect-{}", name)),
594            namespace: Some(namespace),
595            labels: Some(labels.clone()),
596            owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
597                || {
598                    OperatorError::InvalidConfig(
599                        "Failed to generate owner reference for Deployment".into(),
600                    )
601                },
602            )?]),
603            ..Default::default()
604        },
605        spec: Some(k8s_openapi::api::apps::v1::DeploymentSpec {
606            replicas: Some(spec.replicas),
607            selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
608                match_labels: Some(selector_labels),
609                ..Default::default()
610            },
611            template: k8s_openapi::api::core::v1::PodTemplateSpec {
612                metadata: Some(ObjectMeta {
613                    labels: Some(labels),
614                    annotations: Some(spec.pod_annotations.clone()),
615                    ..Default::default()
616                }),
617                spec: Some(pod_spec),
618            },
619            ..Default::default()
620        }),
621        ..Default::default()
622    })
623}
624
625/// Build the Service for connect workers
626fn build_connect_service(connect: &RivvenConnect) -> Result<Service> {
627    let name = connect.name_any();
628    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
629
630    // Selector labels must match the Deployment selector (immutable 2-label set)
631    let selector_labels: BTreeMap<String, String> = [
632        (
633            "app.kubernetes.io/name".to_string(),
634            "rivven-connect".to_string(),
635        ),
636        ("app.kubernetes.io/instance".to_string(), name.clone()),
637    ]
638    .into();
639
640    let mut labels = selector_labels.clone();
641    labels.insert(
642        "app.kubernetes.io/component".to_string(),
643        "connect".to_string(),
644    );
645    labels.insert(
646        "app.kubernetes.io/managed-by".to_string(),
647        "rivven-operator".to_string(),
648    );
649
650    Ok(Service {
651        metadata: ObjectMeta {
652            name: Some(format!("rivven-connect-{}", name)),
653            namespace: Some(namespace),
654            labels: Some(labels),
655            owner_references: Some(vec![connect.controller_owner_ref(&()).ok_or_else(
656                || {
657                    OperatorError::InvalidConfig(
658                        "Failed to generate owner reference for Service".into(),
659                    )
660                },
661            )?]),
662            ..Default::default()
663        },
664        spec: Some(k8s_openapi::api::core::v1::ServiceSpec {
665            selector: Some(selector_labels),
666            ports: Some(vec![
667                k8s_openapi::api::core::v1::ServicePort {
668                    name: Some("http".to_string()),
669                    port: 8080,
670                    target_port: Some(
671                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
672                    ),
673                    ..Default::default()
674                },
675                k8s_openapi::api::core::v1::ServicePort {
676                    name: Some("metrics".to_string()),
677                    port: 9090,
678                    target_port: Some(
679                        k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(9090),
680                    ),
681                    ..Default::default()
682                },
683            ]),
684            ..Default::default()
685        }),
686        ..Default::default()
687    })
688}
689
690/// Verify the operator still owns a resource before force-applying.
691///
692/// Checks whether an existing resource was created by the rivven-operator by
693/// inspecting the `app.kubernetes.io/managed-by` label. If the resource
694/// exists and is managed by a different controller, the apply is rejected.
695fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
696    let labels = existing.meta().labels.as_ref();
697    let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
698    match managed_by {
699        Some(manager) if manager != "rivven-operator" => {
700            let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
701            Err(OperatorError::InvalidConfig(format!(
702                "resource '{}' is managed by '{}', not rivven-operator; \
703                 refusing to force-apply to avoid ownership conflict",
704                name, manager
705            )))
706        }
707        _ => Ok(()),
708    }
709}
710
711/// Apply ConfigMap using server-side apply
712async fn apply_connect_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
713    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
714    let name =
715        cm.metadata.name.as_ref().ok_or_else(|| {
716            OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
717        })?;
718
719    debug!(name = %name, "Applying Connect ConfigMap");
720
721    // Verify ownership before force-applying
722    if let Ok(existing) = api.get(name).await {
723        verify_ownership(&existing)?;
724    }
725
726    let patch_params = PatchParams::apply("rivven-operator").force();
727    api.patch(name, &patch_params, &Patch::Apply(&cm))
728        .await
729        .map_err(OperatorError::from)?;
730
731    Ok(())
732}
733
734/// Apply Deployment using server-side apply
735async fn apply_connect_deployment(
736    client: &Client,
737    namespace: &str,
738    deployment: Deployment,
739) -> Result<Option<k8s_openapi::api::apps::v1::DeploymentStatus>> {
740    let api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
741    let name =
742        deployment.metadata.name.as_ref().ok_or_else(|| {
743            OperatorError::InvalidConfig("Deployment missing metadata.name".into())
744        })?;
745
746    debug!(name = %name, "Applying Connect Deployment");
747
748    // Verify ownership before force-applying
749    if let Ok(existing) = api.get(name).await {
750        verify_ownership(&existing)?;
751    }
752
753    let patch_params = PatchParams::apply("rivven-operator").force();
754    let result = api
755        .patch(name, &patch_params, &Patch::Apply(&deployment))
756        .await
757        .map_err(OperatorError::from)?;
758
759    Ok(result.status)
760}
761
762/// Apply Service using server-side apply
763async fn apply_connect_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
764    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
765    let name = svc
766        .metadata
767        .name
768        .as_ref()
769        .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
770
771    debug!(name = %name, "Applying Connect Service");
772
773    // Verify ownership before force-applying
774    if let Ok(existing) = api.get(name).await {
775        verify_ownership(&existing)?;
776    }
777
778    let patch_params = PatchParams::apply("rivven-operator").force();
779    api.patch(name, &patch_params, &Patch::Apply(&svc))
780        .await
781        .map_err(OperatorError::from)?;
782
783    Ok(())
784}
785
786/// Build connect status from deployment status
787fn build_connect_status(
788    connect: &RivvenConnect,
789    deploy_status: Option<k8s_openapi::api::apps::v1::DeploymentStatus>,
790) -> RivvenConnectStatus {
791    let now = Utc::now().to_rfc3339();
792    let spec = &connect.spec;
793
794    let (replicas, ready_replicas, updated_replicas) = deploy_status
795        .map(|s| {
796            (
797                s.replicas.unwrap_or(0),
798                s.ready_replicas.unwrap_or(0),
799                s.updated_replicas.unwrap_or(0),
800            )
801        })
802        .unwrap_or((0, 0, 0));
803
804    let desired_replicas = spec.replicas;
805
806    // Determine phase
807    let phase = if ready_replicas == 0 {
808        ConnectPhase::Pending
809    } else if ready_replicas < desired_replicas {
810        if updated_replicas < desired_replicas {
811            ConnectPhase::Starting
812        } else {
813            ConnectPhase::Degraded
814        }
815    } else {
816        ConnectPhase::Running
817    };
818
819    // Count configured connectors
820    let sources_total = spec.sources.len() as i32;
821    let sinks_total = spec.sinks.len() as i32;
822
823    // Estimate running connectors (simplified - in production, query the connect workers)
824    let sources_running = if phase == ConnectPhase::Running {
825        spec.sources.iter().filter(|s| s.enabled).count() as i32
826    } else {
827        0
828    };
829    let sinks_running = if phase == ConnectPhase::Running {
830        spec.sinks.iter().filter(|s| s.enabled).count() as i32
831    } else {
832        0
833    };
834
835    // Build conditions
836    let mut conditions = vec![];
837
838    conditions.push(ConnectCondition {
839        condition_type: "Ready".to_string(),
840        status: if ready_replicas >= desired_replicas {
841            "True".to_string()
842        } else {
843            "False".to_string()
844        },
845        reason: Some(format!(
846            "{}/{} replicas ready",
847            ready_replicas, desired_replicas
848        )),
849        message: None,
850        last_transition_time: Some(now.clone()),
851    });
852
853    conditions.push(ConnectCondition {
854        condition_type: "BrokerConnected".to_string(),
855        status: if phase == ConnectPhase::Running {
856            "True".to_string()
857        } else {
858            "Unknown".to_string()
859        },
860        reason: Some("ClusterRefValid".to_string()),
861        message: None,
862        last_transition_time: Some(now.clone()),
863    });
864
865    conditions.push(ConnectCondition {
866        condition_type: "SourcesHealthy".to_string(),
867        status: if sources_running == sources_total && sources_total > 0 {
868            "True".to_string()
869        } else if sources_running > 0 {
870            "Partial".to_string()
871        } else if sources_total == 0 {
872            "N/A".to_string()
873        } else {
874            "False".to_string()
875        },
876        reason: Some(format!(
877            "{}/{} sources running",
878            sources_running, sources_total
879        )),
880        message: None,
881        last_transition_time: Some(now.clone()),
882    });
883
884    conditions.push(ConnectCondition {
885        condition_type: "SinksHealthy".to_string(),
886        status: if sinks_running == sinks_total && sinks_total > 0 {
887            "True".to_string()
888        } else if sinks_running > 0 {
889            "Partial".to_string()
890        } else if sinks_total == 0 {
891            "N/A".to_string()
892        } else {
893            "False".to_string()
894        },
895        reason: Some(format!("{}/{} sinks running", sinks_running, sinks_total)),
896        message: None,
897        last_transition_time: Some(now.clone()),
898    });
899
900    // Build connector statuses
901    let mut connector_statuses = Vec::new();
902
903    for source in &spec.sources {
904        connector_statuses.push(ConnectorStatus {
905            name: source.name.clone(),
906            connector_type: "source".to_string(),
907            kind: source.connector.clone(),
908            state: if source.enabled && phase == ConnectPhase::Running {
909                "running".to_string()
910            } else if !source.enabled {
911                "disabled".to_string()
912            } else {
913                "pending".to_string()
914            },
915            status_source: "synthetic".to_string(),
916            last_probed: None,
917            events_processed: 0, // Would need to query metrics
918            last_error: None,
919            last_success_time: None,
920        });
921    }
922
923    for sink in &spec.sinks {
924        connector_statuses.push(ConnectorStatus {
925            name: sink.name.clone(),
926            connector_type: "sink".to_string(),
927            kind: sink.connector.clone(),
928            state: if sink.enabled && phase == ConnectPhase::Running {
929                "running".to_string()
930            } else if !sink.enabled {
931                "disabled".to_string()
932            } else {
933                "pending".to_string()
934            },
935            status_source: "synthetic".to_string(),
936            last_probed: None,
937            events_processed: 0,
938            last_error: None,
939            last_success_time: None,
940        });
941    }
942
943    RivvenConnectStatus {
944        phase,
945        replicas,
946        ready_replicas,
947        sources_running,
948        sinks_running,
949        sources_total,
950        sinks_total,
951        observed_generation: connect.metadata.generation.unwrap_or(0),
952        conditions,
953        connector_statuses,
954        last_updated: Some(now),
955        message: None,
956    }
957}
958
959/// Build a failed status
960fn build_failed_status(connect: &RivvenConnect, error_msg: &str) -> RivvenConnectStatus {
961    let now = Utc::now().to_rfc3339();
962
963    RivvenConnectStatus {
964        phase: ConnectPhase::Failed,
965        replicas: 0,
966        ready_replicas: 0,
967        sources_running: 0,
968        sinks_running: 0,
969        sources_total: connect.spec.sources.len() as i32,
970        sinks_total: connect.spec.sinks.len() as i32,
971        observed_generation: connect.metadata.generation.unwrap_or(0),
972        conditions: vec![ConnectCondition {
973            condition_type: "Ready".to_string(),
974            status: "False".to_string(),
975            reason: Some("ValidationFailed".to_string()),
976            message: Some(error_msg.to_string()),
977            last_transition_time: Some(now.clone()),
978        }],
979        connector_statuses: vec![],
980        last_updated: Some(now),
981        message: Some(error_msg.to_string()),
982    }
983}
984
985/// Update the connect status subresource
986async fn update_connect_status(
987    client: &Client,
988    namespace: &str,
989    name: &str,
990    status: RivvenConnectStatus,
991) -> Result<()> {
992    let api: Api<RivvenConnect> = Api::namespaced(client.clone(), namespace);
993
994    debug!(name = %name, phase = ?status.phase, "Updating connect status");
995
996    let patch = serde_json::json!({
997        "status": status
998    });
999
1000    let patch_params = PatchParams::default();
1001    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
1002        .await
1003        .map_err(OperatorError::from)?;
1004
1005    Ok(())
1006}
1007
1008/// Cleanup resources when connect is deleted
1009#[instrument(skip(connect, ctx))]
1010async fn cleanup_connect(
1011    connect: Arc<RivvenConnect>,
1012    ctx: Arc<ConnectControllerContext>,
1013) -> Result<Action> {
1014    let name = connect.name_any();
1015    let namespace = connect.namespace().unwrap_or_else(|| "default".to_string());
1016
1017    info!(name = %name, namespace = %namespace, "Cleaning up RivvenConnect resources");
1018
1019    // If this connect CR had sink connectors, set status to Terminating so
1020    // the connect workers can attempt a final offset commit before shutdown.
1021    let has_sinks = !connect.spec.sinks.is_empty();
1022    if has_sinks {
1023        info!(name = %name, "Sink connectors detected — setting Terminating status for final offset commit");
1024        let draining_status = RivvenConnectStatus {
1025            phase: ConnectPhase::Terminating,
1026            replicas: 0,
1027            ready_replicas: 0,
1028            sources_running: 0,
1029            sinks_running: 0,
1030            sources_total: connect.spec.sources.len() as i32,
1031            sinks_total: connect.spec.sinks.len() as i32,
1032            observed_generation: connect.metadata.generation.unwrap_or(0),
1033            conditions: vec![],
1034            connector_statuses: vec![],
1035            last_updated: Some(Utc::now().to_rfc3339()),
1036            message: Some("Draining sink connectors before deletion".to_string()),
1037        };
1038        if let Err(e) = update_connect_status(&ctx.client, &namespace, &name, draining_status).await
1039        {
1040            warn!(name = %name, error = %e, "Failed to set Terminating status — continuing cleanup");
1041        }
1042    }
1043
1044    // Delete operator-owned ConfigMap: rivven-connect-{name}
1045    let cm_name = format!("rivven-connect-{}", name);
1046    let configmaps: Api<ConfigMap> = Api::namespaced(ctx.client.clone(), &namespace);
1047    match configmaps.delete(&cm_name, &Default::default()).await {
1048        Ok(_) => info!(name = %name, configmap = %cm_name, "Deleted operator-owned ConfigMap"),
1049        Err(kube::Error::Api(ae)) if ae.code == 404 => {
1050            debug!(name = %name, configmap = %cm_name, "ConfigMap already deleted");
1051        }
1052        Err(e) => {
1053            warn!(name = %name, configmap = %cm_name, error = %e, "Failed to delete ConfigMap — continuing cleanup");
1054        }
1055    }
1056
1057    // Delete operator-owned Deployment: rivven-connect-{name}
1058    let deploy_name = format!("rivven-connect-{}", name);
1059    let deployments: Api<Deployment> = Api::namespaced(ctx.client.clone(), &namespace);
1060    match deployments.delete(&deploy_name, &Default::default()).await {
1061        Ok(_) => {
1062            info!(name = %name, deployment = %deploy_name, "Deleted operator-owned Deployment")
1063        }
1064        Err(kube::Error::Api(ae)) if ae.code == 404 => {
1065            debug!(name = %name, deployment = %deploy_name, "Deployment already deleted");
1066        }
1067        Err(e) => {
1068            warn!(name = %name, deployment = %deploy_name, error = %e, "Failed to delete Deployment — continuing cleanup");
1069        }
1070    }
1071
1072    // Delete operator-owned Service: rivven-connect-{name}
1073    let svc_name = format!("rivven-connect-{}", name);
1074    let services: Api<Service> = Api::namespaced(ctx.client.clone(), &namespace);
1075    match services.delete(&svc_name, &Default::default()).await {
1076        Ok(_) => info!(name = %name, service = %svc_name, "Deleted operator-owned Service"),
1077        Err(kube::Error::Api(ae)) if ae.code == 404 => {
1078            debug!(name = %name, service = %svc_name, "Service already deleted");
1079        }
1080        Err(e) => {
1081            warn!(name = %name, service = %svc_name, error = %e, "Failed to delete Service — continuing cleanup");
1082        }
1083    }
1084
1085    info!(name = %name, "Connect cleanup complete");
1086
1087    Ok(Action::await_change())
1088}
1089
1090/// Error policy for the connect controller
1091fn connect_error_policy(
1092    _connect: Arc<RivvenConnect>,
1093    error: &OperatorError,
1094    _ctx: Arc<ConnectControllerContext>,
1095) -> Action {
1096    warn!(
1097        error = %error,
1098        "Connect reconciliation error, will retry"
1099    );
1100
1101    let delay = error
1102        .requeue_delay()
1103        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
1104
1105    Action::requeue(delay)
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111    use crate::crd::{
1112        ConnectConfigSpec, ConnectTlsSpec, GlobalConnectSettings, RateLimitSpec, SinkConnectorSpec,
1113        SourceConnectorSpec, SourceTopicConfigSpec,
1114    };
1115
1116    fn create_test_connect() -> RivvenConnect {
1117        RivvenConnect {
1118            metadata: ObjectMeta {
1119                name: Some("test-connect".to_string()),
1120                namespace: Some("default".to_string()),
1121                uid: Some("test-uid".to_string()),
1122                generation: Some(1),
1123                ..Default::default()
1124            },
1125            spec: RivvenConnectSpec {
1126                cluster_ref: ClusterReference {
1127                    name: "test-cluster".to_string(),
1128                    namespace: None,
1129                },
1130                replicas: 2,
1131                version: "0.0.1".to_string(),
1132                image: None,
1133                image_pull_policy: "IfNotPresent".to_string(),
1134                image_pull_secrets: vec![],
1135                resources: None,
1136                config: ConnectConfigSpec::default(),
1137                sources: vec![SourceConnectorSpec {
1138                    name: "test-source".to_string(),
1139                    connector: "datagen".to_string(),
1140                    topic: "test-topic".to_string(),
1141                    topic_routing: None,
1142                    enabled: true,
1143                    config: serde_json::Value::Null,
1144                    config_secret_ref: None,
1145                    topic_config: SourceTopicConfigSpec::default(),
1146                }],
1147                sinks: vec![SinkConnectorSpec {
1148                    name: "test-sink".to_string(),
1149                    connector: "stdout".to_string(),
1150                    topics: vec!["test-topic".to_string()],
1151                    consumer_group: "test-group".to_string(),
1152                    enabled: true,
1153                    start_offset: "latest".to_string(),
1154                    config: serde_json::Value::Null,
1155                    config_secret_ref: None,
1156                    rate_limit: RateLimitSpec::default(),
1157                }],
1158                settings: GlobalConnectSettings::default(),
1159                tls: ConnectTlsSpec::default(),
1160                pod_annotations: BTreeMap::new(),
1161                pod_labels: BTreeMap::new(),
1162                env: vec![],
1163                node_selector: BTreeMap::new(),
1164                tolerations: vec![],
1165                affinity: None,
1166                service_account: None,
1167                security_context: None,
1168                container_security_context: None,
1169            },
1170            status: None,
1171        }
1172    }
1173
1174    #[test]
1175    fn test_build_connect_status_pending() {
1176        let connect = create_test_connect();
1177        let status = build_connect_status(&connect, None);
1178
1179        assert_eq!(status.phase, ConnectPhase::Pending);
1180        assert_eq!(status.replicas, 0);
1181        assert_eq!(status.ready_replicas, 0);
1182        assert_eq!(status.sources_total, 1);
1183        assert_eq!(status.sinks_total, 1);
1184    }
1185
1186    #[test]
1187    fn test_build_connect_status_running() {
1188        let connect = create_test_connect();
1189        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1190            replicas: Some(2),
1191            ready_replicas: Some(2),
1192            updated_replicas: Some(2),
1193            ..Default::default()
1194        };
1195
1196        let status = build_connect_status(&connect, Some(deploy_status));
1197
1198        assert_eq!(status.phase, ConnectPhase::Running);
1199        assert_eq!(status.replicas, 2);
1200        assert_eq!(status.ready_replicas, 2);
1201        assert_eq!(status.sources_running, 1);
1202        assert_eq!(status.sinks_running, 1);
1203    }
1204
1205    #[test]
1206    fn test_build_connect_status_degraded() {
1207        let connect = create_test_connect();
1208        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1209            replicas: Some(2),
1210            ready_replicas: Some(1),
1211            updated_replicas: Some(2),
1212            ..Default::default()
1213        };
1214
1215        let status = build_connect_status(&connect, Some(deploy_status));
1216
1217        assert_eq!(status.phase, ConnectPhase::Degraded);
1218    }
1219
1220    #[test]
1221    fn test_build_failed_status() {
1222        let connect = create_test_connect();
1223        let status = build_failed_status(&connect, "Test error");
1224
1225        assert_eq!(status.phase, ConnectPhase::Failed);
1226        assert_eq!(status.message, Some("Test error".to_string()));
1227        assert_eq!(status.conditions.len(), 1);
1228        assert_eq!(status.conditions[0].status, "False");
1229    }
1230
1231    #[test]
1232    fn test_build_pipeline_yaml() {
1233        let connect = create_test_connect();
1234        let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1235
1236        // serde_yaml serializes the full structure; verify key elements are present
1237        assert!(yaml.contains("version:"));
1238        assert!(yaml.contains("1.0"));
1239        assert!(yaml.contains("bootstrap_servers:"));
1240        assert!(yaml.contains("rivven-test-cluster"));
1241        assert!(yaml.contains("sources:"));
1242        assert!(yaml.contains("test-source"));
1243        assert!(yaml.contains("sinks:"));
1244        assert!(yaml.contains("test-sink"));
1245    }
1246
1247    #[test]
1248    fn test_build_connect_configmap() {
1249        let connect = create_test_connect();
1250        let cm = build_connect_configmap(&connect).unwrap();
1251
1252        assert_eq!(
1253            cm.metadata.name,
1254            Some("rivven-connect-test-connect".to_string())
1255        );
1256        assert!(cm.data.unwrap().contains_key("pipeline.yaml"));
1257    }
1258
1259    #[test]
1260    fn test_build_connect_deployment() {
1261        let connect = create_test_connect();
1262        let deployment = build_connect_deployment(&connect).unwrap();
1263
1264        assert_eq!(
1265            deployment.metadata.name,
1266            Some("rivven-connect-test-connect".to_string())
1267        );
1268        assert_eq!(deployment.spec.as_ref().unwrap().replicas, Some(2));
1269    }
1270
1271    #[test]
1272    fn test_build_connect_service() {
1273        let connect = create_test_connect();
1274        let service = build_connect_service(&connect).unwrap();
1275
1276        assert_eq!(
1277            service.metadata.name,
1278            Some("rivven-connect-test-connect".to_string())
1279        );
1280        let ports = service.spec.as_ref().unwrap().ports.as_ref().unwrap();
1281        assert_eq!(ports.len(), 2);
1282    }
1283
1284    #[test]
1285    fn test_connector_statuses() {
1286        let connect = create_test_connect();
1287        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1288            replicas: Some(2),
1289            ready_replicas: Some(2),
1290            updated_replicas: Some(2),
1291            ..Default::default()
1292        };
1293
1294        let status = build_connect_status(&connect, Some(deploy_status));
1295
1296        assert_eq!(status.connector_statuses.len(), 2);
1297
1298        let source_status = status
1299            .connector_statuses
1300            .iter()
1301            .find(|s| s.name == "test-source")
1302            .unwrap();
1303        assert_eq!(source_status.connector_type, "source");
1304        assert_eq!(source_status.state, "running");
1305
1306        let sink_status = status
1307            .connector_statuses
1308            .iter()
1309            .find(|s| s.name == "test-sink")
1310            .unwrap();
1311        assert_eq!(sink_status.connector_type, "sink");
1312        assert_eq!(sink_status.state, "running");
1313    }
1314
1315    #[test]
1316    fn test_conditions() {
1317        let connect = create_test_connect();
1318        let deploy_status = k8s_openapi::api::apps::v1::DeploymentStatus {
1319            replicas: Some(2),
1320            ready_replicas: Some(2),
1321            updated_replicas: Some(2),
1322            ..Default::default()
1323        };
1324
1325        let status = build_connect_status(&connect, Some(deploy_status));
1326
1327        assert_eq!(status.conditions.len(), 4);
1328
1329        let ready_cond = status
1330            .conditions
1331            .iter()
1332            .find(|c| c.condition_type == "Ready")
1333            .unwrap();
1334        assert_eq!(ready_cond.status, "True");
1335
1336        let broker_cond = status
1337            .conditions
1338            .iter()
1339            .find(|c| c.condition_type == "BrokerConnected")
1340            .unwrap();
1341        assert_eq!(broker_cond.status, "True");
1342    }
1343
1344    #[test]
1345    fn test_yaml_escape_prevents_injection() {
1346        // serde_yaml is now used for serialization which handles escaping
1347        // automatically. Verify that special characters in connector names
1348        // are safely serialized.
1349        let mut connect = create_test_connect();
1350        connect.spec.sources[0].name = "evil\ninjected_key: true".to_string();
1351        let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1352
1353        // serde_yaml must properly quote/escape the malicious key.
1354        // The raw string must NOT appear as a bare top-level key.
1355        assert!(!yaml.contains("\ninjected_key: true\n"));
1356
1357        // The YAML must still be parseable
1358        let parsed: serde_yaml::Value = serde_yaml::from_str(&yaml).unwrap();
1359        let sources = parsed.get("sources").unwrap().as_mapping().unwrap();
1360        // The key with newline must be present as a single key, not split
1361        assert!(sources
1362            .keys()
1363            .any(|k| k.as_str() == Some("evil\ninjected_key: true")));
1364    }
1365
1366    #[test]
1367    fn test_build_pipeline_yaml_escapes_values() {
1368        let mut connect = create_test_connect();
1369        // Inject a malicious source name with YAML special chars
1370        connect.spec.sources[0].name = "evil\ninjected_key: true".to_string();
1371        let yaml = build_pipeline_yaml(&connect.spec).unwrap();
1372        // The YAML must round-trip correctly
1373        let parsed: serde_yaml::Value = serde_yaml::from_str(&yaml).unwrap();
1374        assert!(parsed.get("sources").is_some());
1375    }
1376}