gitops_operator/configuration/
configuration.rs

1use crate::files::{needs_patching, patch_deployment};
2use crate::git::{clone_repo, commit_changes, get_latest_commit};
3use crate::notifications::send as send_notification;
4use anyhow::{Context, Error};
5use axum::extract::State as AxumState;
6use k8s_openapi::api::apps::v1::Deployment;
7use k8s_openapi::api::core::v1::Secret;
8use kube::{Api, Client, ResourceExt};
9use std::collections::BTreeMap;
10use std::fs::remove_dir_all;
11use std::path::Path;
12use tracing::{error, info, warn};
13
14use crate::registry::{get_registry_auth_from_secret, RegistryChecker};
15
16use axum::Json;
17use futures::future;
18use kube::runtime::reflector;
19
20type Cache = reflector::Store<Deployment>;
21
22#[derive(serde::Serialize, Clone, Debug, PartialEq)]
23pub enum State {
24    Queued,
25    Processing(String),
26    Success(String),
27    Failure(String),
28}
29
30#[derive(serde::Serialize, Clone, Debug, PartialEq)]
31pub struct Config {
32    pub enabled: bool,
33    pub namespace: String,
34    pub app_repository: String,
35    pub manifest_repository: String,
36    pub image_name: String,
37    pub deployment_path: String,
38    pub observe_branch: String,
39    pub tag_type: String,
40    pub ssh_key_name: String,
41    pub ssh_key_namespace: String,
42    pub notifications_secret_name: Option<String>,
43    pub notifications_secret_namespace: Option<String>,
44    pub registry_url: Option<String>,
45    pub registry_secret_name: Option<String>,
46    pub registry_secret_namespace: Option<String>,
47    pub state: State,
48}
49
50#[derive(serde::Serialize, Clone, Debug, PartialEq)]
51pub struct Entry {
52    pub container: String,
53    pub name: String,
54    pub namespace: String,
55    pub annotations: BTreeMap<String, String>,
56    pub version: String,
57    #[serde(default)]
58    pub config: Config,
59}
60
61impl Entry {
62    pub fn new(d: &Deployment) -> Option<Entry> {
63        let name = d.name_any();
64        let namespace = d.namespace()?;
65        let annotations = d.metadata.annotations.as_ref()?;
66        let tpl = d.spec.as_ref()?.template.spec.as_ref()?;
67        let img = tpl.containers.first()?.image.as_ref()?;
68        let splits = img.splitn(2, ':').collect::<Vec<_>>();
69        let (container, version) = match *splits.as_slice() {
70            [c, v] => (c.to_owned(), v.to_owned()),
71            [c] => (c.to_owned(), "latest".to_owned()),
72            _ => return None,
73        };
74
75        let enabled = annotations
76            .get("gitops.operator.enabled")?
77            .trim()
78            .parse()
79            .unwrap_or(false);
80        let app_repository = annotations
81            .get("gitops.operator.app_repository")?
82            .to_string();
83        let manifest_repository = annotations
84            .get("gitops.operator.manifest_repository")?
85            .to_string();
86        let image_name = annotations.get("gitops.operator.image_name")?.to_string();
87        let deployment_path = annotations
88            .get("gitops.operator.deployment_path")?
89            .to_string();
90        let observe_branch = annotations
91            .get("gitops.operator.observe_branch")
92            .unwrap_or(&"master".to_string())
93            .to_string();
94        let tag_type = annotations
95            .get("gitops.operator.tag_type")
96            .unwrap_or(&"long".to_string())
97            .to_string();
98
99        let tag_type = match tag_type.as_str() {
100            "short" => "short",
101            _ => "long",
102        }
103        .to_string();
104
105        let ssh_key_name = annotations.get("gitops.operator.ssh_key_name")?.to_string();
106        let ssh_key_namespace = annotations
107            .get("gitops.operator.ssh_key_namespace")?
108            .to_string();
109
110        let notifications_secret_name = annotations
111            .get("gitops.operator.notifications_secret_name")
112            .map(|name| name.to_string());
113
114        let notifications_secret_namespace = annotations
115            .get("gitops.operator.notifications_secret_namespace")
116            .map(|name| name.to_string());
117
118        let registry_url = annotations
119            .get("gitops.operator.registry_secret_url")
120            .map(|name| name.to_string());
121
122        let registry_secret_name = annotations
123            .get("gitops.operator.registry_secret_name")
124            .map(|name| name.to_string());
125
126        let registry_secret_namespace = annotations
127            .get("gitops.operator.registry_secret_namespace")
128            .map(|name| name.to_string());
129
130        info!("Processing: {}/{}", &namespace, &name);
131
132        Some(Entry {
133            name,
134            namespace: namespace.clone(),
135            annotations: annotations.clone(),
136            container,
137            version,
138            config: Config {
139                enabled,
140                namespace: namespace.clone(),
141                app_repository,
142                manifest_repository,
143                image_name,
144                deployment_path,
145                observe_branch,
146                tag_type,
147                ssh_key_name,
148                ssh_key_namespace,
149                notifications_secret_name,
150                notifications_secret_namespace,
151                registry_url,
152                registry_secret_name,
153                registry_secret_namespace,
154                state: State::Queued,
155            },
156        })
157    }
158
159    async fn get_ssh_key(self) -> Result<String, Error> {
160        let client = Client::try_default().await?;
161        let secrets: Api<Secret> = Api::namespaced(client, &self.config.ssh_key_namespace);
162        let secret = secrets.get(&self.config.ssh_key_name).await?;
163
164        let secret_data = secret.data.context("Failed to read the data section")?;
165
166        let encoded_key = secret_data
167        .get("ssh-privatekey")
168        .context("Failed to read field: ssh-privatekey in data, consider recreating the secret with kubectl create secret generic name --from-file=ssh-privatekey=/path")?;
169
170        let key_bytes = encoded_key.0.clone();
171
172        String::from_utf8(key_bytes).context("Failed to convert key to string")
173    }
174
175    // TODO: keep refactoring this and the next fn and making it more rusty
176    async fn get_notifications_secret(name: &str, namespace: &str) -> Result<String, Error> {
177        if name.is_empty() {
178            return Ok(String::new());
179        }
180
181        let client = Client::try_default().await?;
182        let secrets: Api<Secret> = Api::namespaced(client, namespace);
183        let secret = secrets.get(name).await?;
184
185        let secret_data = secret.data.context("Failed to read the data section")?;
186
187        let encoded_url = secret_data
188        .get("webhook-url")
189        .context("Failed to read field: webhook-url in data, consider recreating the secret with kubectl create secret generic webhook-secret-name -n your_namespace --from-literal=webhook-url=https://hooks.sl...")?;
190
191        let bytes = encoded_url.0.clone();
192
193        String::from_utf8(bytes).context("Failed to convert key to string")
194    }
195
196    async fn get_notifications_endpoint(&self) -> Option<String> {
197        match Entry::get_notifications_secret(
198            &self
199                .config
200                .notifications_secret_name
201                .clone()
202                .unwrap_or_default(),
203            &self
204                .config
205                .notifications_secret_namespace
206                .clone()
207                .unwrap_or("gitops-operator".to_string()),
208        )
209        .await
210        {
211            Ok(endpoint) => Some(endpoint),
212            Err(e) => {
213                warn!("Failed to get notifications secret: {:?}", e);
214                None
215            }
216        }
217    }
218
219    #[tracing::instrument(name = "process_deployment", skip(self), fields())]
220    pub async fn process_deployment(self) -> State {
221        info!("Processing: {}/{}", &self.namespace, &self.name);
222
223        let endpoint = &self.get_notifications_endpoint().await;
224
225        let ssh_key_secret = match self.clone().get_ssh_key().await {
226            Ok(key) => key,
227            Err(e) => {
228                error!("Failed to get SSH key: {:?}", e);
229                return State::Failure(format!("Failed to get SSH key: {:#?}", e));
230            }
231        };
232
233        let registry_url = self
234            .config
235            .registry_url
236            .as_deref()
237            .unwrap_or("https://index.docker.io/v1/");
238
239        let registry_credentials = get_registry_auth_from_secret(
240            self.config
241                .registry_secret_name
242                .as_deref()
243                .unwrap_or("regcred"),
244            self.config
245                .registry_secret_namespace
246                .as_deref()
247                .unwrap_or("gitops-operator"),
248            registry_url,
249        )
250        .await;
251
252        info!("Creating registry checker for: {}", registry_url);
253        let registry_checker = match registry_credentials {
254            Ok(credentials) => {
255                RegistryChecker::new(registry_url.to_string(), Some(credentials.to_string())).await
256            }
257            Err(e) => {
258                error!("Failed to get registry credentials: {:?}", e);
259                // return State::Failure(format!("Failed to get registry credentials: {:#?}", e));
260                Err(e)
261            }
262        };
263
264        // Start process
265        info!("Performing reconciliation for: {}", &self.name);
266        let app_repo_path = format!("/tmp/app-{}-{}/", &self.name, &self.config.observe_branch);
267        let manifest_repo_path = format!(
268            "/tmp/manifest-{}-{}/",
269            &self.name, &self.config.observe_branch
270        );
271
272        // Create concurrent clone operations
273        info!("Cloning repositories for: {}", &self.name);
274        let app_clone = {
275            let repo = self.config.app_repository.clone();
276            let path = app_repo_path.clone();
277            let branch = self.config.observe_branch.clone();
278            let ssh_key_secret = ssh_key_secret.clone();
279            tokio::task::spawn_blocking(move || clone_repo(&repo, &path, &branch, &ssh_key_secret))
280        };
281
282        let manifest_clone = {
283            let repo = self.config.manifest_repository.clone();
284            let path = manifest_repo_path.clone();
285            let branch = self.config.observe_branch.clone();
286            let ssh_key_secret = ssh_key_secret.clone();
287            tokio::task::spawn_blocking(move || clone_repo(&repo, &path, &branch, &ssh_key_secret))
288        };
289
290        // Wait for both clones to complete
291        if let Err(e) = tokio::try_join!(app_clone, manifest_clone) {
292            error!("Failed to clone repositories: {:?}", e);
293        }
294
295        // Find the latest remote head
296        info!("Getting latest commit for: {}", &self.name);
297        let new_sha = get_latest_commit(
298            Path::new(&app_repo_path),
299            &self.config.observe_branch,
300            &self.config.tag_type,
301            &ssh_key_secret,
302        );
303
304        let new_sha = match new_sha {
305            Ok(sha) => sha,
306            Err(e) => {
307                error!("Failed to get latest SHA: {:?}", e);
308                return State::Failure(format!("Failed to get latest SHA: {:#?}", e));
309            }
310        };
311
312        let deployment_path = format!("{}/{}", &manifest_repo_path, &self.config.deployment_path);
313
314        if needs_patching(&deployment_path, &new_sha).unwrap_or(false) {
315            match patch_deployment(&deployment_path, &self.config.image_name, &new_sha) {
316                Ok(_) => info!("File patched successfully for: {}", &self.name),
317                Err(e) => {
318                    let _ = remove_dir_all(&manifest_repo_path);
319
320                    if endpoint.is_some() {
321                        let message = format!(
322                            "Failed to patch deployment: {} to version: {}",
323                            &self.name, &new_sha
324                        );
325                        match send_notification(&message, endpoint.as_deref()).await {
326                            Ok(_) => info!("Notification sent successfully"),
327                            Err(e) => {
328                                warn!("Failed to send notification: {:?}", e);
329                            }
330                        }
331                    }
332
333                    error!("Failed to patch deployment: {:?}", e);
334                }
335            }
336
337            match commit_changes(&manifest_repo_path, &ssh_key_secret) {
338                Ok(_) => info!("Changes committed successfully"),
339                Err(e) => {
340                    let _ = remove_dir_all(&manifest_repo_path);
341                    error!(
342                        "Failed to commit changes, cleaning up manifests repo for next run: {:?}",
343                        e
344                    );
345                }
346            }
347
348            if endpoint.is_some() {
349                let message = format!(
350                    "Deployment {} has been patched successfully to version: {}",
351                    &self.name, &new_sha
352                );
353                match send_notification(&message, endpoint.as_deref()).await {
354                    Ok(_) => info!("Notification sent successfully"),
355                    Err(e) => {
356                        warn!("Failed to send notification: {:?}", e);
357                    }
358                }
359            }
360
361            let message = format!(
362                "Deployment: {} patched successfully to version: {}",
363                &self.name, &new_sha
364            );
365            info!(message);
366
367            return State::Success(message);
368        } else {
369            info!("Checking image: {}", &self.config.image_name);
370            if registry_checker.is_ok()
371                && !registry_checker
372                    .expect("Failed to create instance of registry checker")
373                    .check_image(&self.config.image_name, &new_sha)
374                    .await
375                    .unwrap_or(false)
376            {
377                let message = format!(
378                ":probing_cane: image: https://hub.docker.com/repository/docker/{}/tags with SHA: {} not found in registry, it is likely still building...",
379                &self.config.image_name, &new_sha
380            );
381                if endpoint.is_some() {
382                    match send_notification(&message, endpoint.as_deref()).await {
383                        Ok(_) => info!("Notification sent successfully"),
384                        Err(e) => {
385                            warn!("Failed to send notification: {:?}", e);
386                        }
387                    }
388                }
389                error!("{}", message);
390                return State::Failure(message);
391            }
392
393            let message = format!(
394                "Deployment: {} is up to date, proceeding to next deployment...",
395                &self.name
396            );
397
398            info!(message);
399            return State::Success(message);
400        }
401    }
402
403    pub async fn reconcile(AxumState(store): AxumState<Cache>) -> Json<Vec<State>> {
404        tracing::info!("Starting reconciliation");
405
406        let data: Vec<_> = store.state().iter().filter_map(|d| Entry::new(d)).collect();
407
408        let mut handles: Vec<_> = vec![];
409
410        for entry in data {
411            if !entry.config.enabled {
412                warn!("Config is disabled for deplyment: {}", &entry.name);
413                continue;
414            }
415
416            let deployment = entry.process_deployment();
417
418            handles.push(deployment);
419        }
420
421        let results = future::join_all(handles).await;
422
423        Json(results)
424    }
425}