gitops_operator/configuration/
configuration.rs1use crate::files::{needs_patching, patch_deployment};
2use crate::git::{clone_repo, commit_changes, get_latest_commit};
3use crate::notifications::send as send_notification;
4use anyhow::{Context, Error};
5use axum::extract::State as AxumState;
6use k8s_openapi::api::apps::v1::Deployment;
7use k8s_openapi::api::core::v1::Secret;
8use kube::{Api, Client, ResourceExt};
9use std::collections::BTreeMap;
10use std::fs::remove_dir_all;
11use std::path::Path;
12use tracing::{error, info, warn};
13
14use crate::registry::{get_registry_auth_from_secret, RegistryChecker};
15
16use axum::Json;
17use futures::future;
18use kube::runtime::reflector;
19
20type Cache = reflector::Store<Deployment>;
21
22#[derive(serde::Serialize, Clone, Debug, PartialEq)]
23pub enum State {
24 Queued,
25 Processing(String),
26 Success(String),
27 Failure(String),
28}
29
30#[derive(serde::Serialize, Clone, Debug, PartialEq)]
31pub struct Config {
32 pub enabled: bool,
33 pub namespace: String,
34 pub app_repository: String,
35 pub manifest_repository: String,
36 pub image_name: String,
37 pub deployment_path: String,
38 pub observe_branch: String,
39 pub tag_type: String,
40 pub ssh_key_name: String,
41 pub ssh_key_namespace: String,
42 pub notifications_secret_name: Option<String>,
43 pub notifications_secret_namespace: Option<String>,
44 pub registry_url: Option<String>,
45 pub registry_secret_name: Option<String>,
46 pub registry_secret_namespace: Option<String>,
47 pub state: State,
48}
49
50#[derive(serde::Serialize, Clone, Debug, PartialEq)]
51pub struct Entry {
52 pub container: String,
53 pub name: String,
54 pub namespace: String,
55 pub annotations: BTreeMap<String, String>,
56 pub version: String,
57 #[serde(default)]
58 pub config: Config,
59}
60
61impl Entry {
62 pub fn new(d: &Deployment) -> Option<Entry> {
63 let name = d.name_any();
64 let namespace = d.namespace()?;
65 let annotations = d.metadata.annotations.as_ref()?;
66 let tpl = d.spec.as_ref()?.template.spec.as_ref()?;
67 let img = tpl.containers.first()?.image.as_ref()?;
68 let splits = img.splitn(2, ':').collect::<Vec<_>>();
69 let (container, version) = match *splits.as_slice() {
70 [c, v] => (c.to_owned(), v.to_owned()),
71 [c] => (c.to_owned(), "latest".to_owned()),
72 _ => return None,
73 };
74
75 let enabled = annotations
76 .get("gitops.operator.enabled")?
77 .trim()
78 .parse()
79 .unwrap_or(false);
80 let app_repository = annotations
81 .get("gitops.operator.app_repository")?
82 .to_string();
83 let manifest_repository = annotations
84 .get("gitops.operator.manifest_repository")?
85 .to_string();
86 let image_name = annotations.get("gitops.operator.image_name")?.to_string();
87 let deployment_path = annotations
88 .get("gitops.operator.deployment_path")?
89 .to_string();
90 let observe_branch = annotations
91 .get("gitops.operator.observe_branch")
92 .unwrap_or(&"master".to_string())
93 .to_string();
94 let tag_type = annotations
95 .get("gitops.operator.tag_type")
96 .unwrap_or(&"long".to_string())
97 .to_string();
98
99 let tag_type = match tag_type.as_str() {
100 "short" => "short",
101 _ => "long",
102 }
103 .to_string();
104
105 let ssh_key_name = annotations.get("gitops.operator.ssh_key_name")?.to_string();
106 let ssh_key_namespace = annotations
107 .get("gitops.operator.ssh_key_namespace")?
108 .to_string();
109
110 let notifications_secret_name = annotations
111 .get("gitops.operator.notifications_secret_name")
112 .map(|name| name.to_string());
113
114 let notifications_secret_namespace = annotations
115 .get("gitops.operator.notifications_secret_namespace")
116 .map(|name| name.to_string());
117
118 let registry_url = annotations
119 .get("gitops.operator.registry_secret_url")
120 .map(|name| name.to_string());
121
122 let registry_secret_name = annotations
123 .get("gitops.operator.registry_secret_name")
124 .map(|name| name.to_string());
125
126 let registry_secret_namespace = annotations
127 .get("gitops.operator.registry_secret_namespace")
128 .map(|name| name.to_string());
129
130 info!("Processing: {}/{}", &namespace, &name);
131
132 Some(Entry {
133 name,
134 namespace: namespace.clone(),
135 annotations: annotations.clone(),
136 container,
137 version,
138 config: Config {
139 enabled,
140 namespace: namespace.clone(),
141 app_repository,
142 manifest_repository,
143 image_name,
144 deployment_path,
145 observe_branch,
146 tag_type,
147 ssh_key_name,
148 ssh_key_namespace,
149 notifications_secret_name,
150 notifications_secret_namespace,
151 registry_url,
152 registry_secret_name,
153 registry_secret_namespace,
154 state: State::Queued,
155 },
156 })
157 }
158
159 async fn get_ssh_key(self) -> Result<String, Error> {
160 let client = Client::try_default().await?;
161 let secrets: Api<Secret> = Api::namespaced(client, &self.config.ssh_key_namespace);
162 let secret = secrets.get(&self.config.ssh_key_name).await?;
163
164 let secret_data = secret.data.context("Failed to read the data section")?;
165
166 let encoded_key = secret_data
167 .get("ssh-privatekey")
168 .context("Failed to read field: ssh-privatekey in data, consider recreating the secret with kubectl create secret generic name --from-file=ssh-privatekey=/path")?;
169
170 let key_bytes = encoded_key.0.clone();
171
172 String::from_utf8(key_bytes).context("Failed to convert key to string")
173 }
174
175 async fn get_notifications_secret(name: &str, namespace: &str) -> Result<String, Error> {
177 if name.is_empty() {
178 return Ok(String::new());
179 }
180
181 let client = Client::try_default().await?;
182 let secrets: Api<Secret> = Api::namespaced(client, namespace);
183 let secret = secrets.get(name).await?;
184
185 let secret_data = secret.data.context("Failed to read the data section")?;
186
187 let encoded_url = secret_data
188 .get("webhook-url")
189 .context("Failed to read field: webhook-url in data, consider recreating the secret with kubectl create secret generic webhook-secret-name -n your_namespace --from-literal=webhook-url=https://hooks.sl...")?;
190
191 let bytes = encoded_url.0.clone();
192
193 String::from_utf8(bytes).context("Failed to convert key to string")
194 }
195
196 async fn get_notifications_endpoint(&self) -> Option<String> {
197 match Entry::get_notifications_secret(
198 &self
199 .config
200 .notifications_secret_name
201 .clone()
202 .unwrap_or_default(),
203 &self
204 .config
205 .notifications_secret_namespace
206 .clone()
207 .unwrap_or("gitops-operator".to_string()),
208 )
209 .await
210 {
211 Ok(endpoint) => Some(endpoint),
212 Err(e) => {
213 warn!("Failed to get notifications secret: {:?}", e);
214 None
215 }
216 }
217 }
218
219 #[tracing::instrument(name = "process_deployment", skip(self), fields())]
220 pub async fn process_deployment(self) -> State {
221 info!("Processing: {}/{}", &self.namespace, &self.name);
222
223 let endpoint = &self.get_notifications_endpoint().await;
224
225 let ssh_key_secret = match self.clone().get_ssh_key().await {
226 Ok(key) => key,
227 Err(e) => {
228 error!("Failed to get SSH key: {:?}", e);
229 return State::Failure(format!("Failed to get SSH key: {:#?}", e));
230 }
231 };
232
233 let registry_url = self
234 .config
235 .registry_url
236 .as_deref()
237 .unwrap_or("https://index.docker.io/v1/");
238
239 let registry_credentials = get_registry_auth_from_secret(
240 self.config
241 .registry_secret_name
242 .as_deref()
243 .unwrap_or("regcred"),
244 self.config
245 .registry_secret_namespace
246 .as_deref()
247 .unwrap_or("gitops-operator"),
248 registry_url,
249 )
250 .await;
251
252 info!("Creating registry checker for: {}", registry_url);
253 let registry_checker = match registry_credentials {
254 Ok(credentials) => {
255 RegistryChecker::new(registry_url.to_string(), Some(credentials.to_string())).await
256 }
257 Err(e) => {
258 error!("Failed to get registry credentials: {:?}", e);
259 Err(e)
261 }
262 };
263
264 info!("Performing reconciliation for: {}", &self.name);
266 let app_repo_path = format!("/tmp/app-{}-{}/", &self.name, &self.config.observe_branch);
267 let manifest_repo_path = format!(
268 "/tmp/manifest-{}-{}/",
269 &self.name, &self.config.observe_branch
270 );
271
272 info!("Cloning repositories for: {}", &self.name);
274 let app_clone = {
275 let repo = self.config.app_repository.clone();
276 let path = app_repo_path.clone();
277 let branch = self.config.observe_branch.clone();
278 let ssh_key_secret = ssh_key_secret.clone();
279 tokio::task::spawn_blocking(move || clone_repo(&repo, &path, &branch, &ssh_key_secret))
280 };
281
282 let manifest_clone = {
283 let repo = self.config.manifest_repository.clone();
284 let path = manifest_repo_path.clone();
285 let branch = self.config.observe_branch.clone();
286 let ssh_key_secret = ssh_key_secret.clone();
287 tokio::task::spawn_blocking(move || clone_repo(&repo, &path, &branch, &ssh_key_secret))
288 };
289
290 if let Err(e) = tokio::try_join!(app_clone, manifest_clone) {
292 error!("Failed to clone repositories: {:?}", e);
293 }
294
295 info!("Getting latest commit for: {}", &self.name);
297 let new_sha = get_latest_commit(
298 Path::new(&app_repo_path),
299 &self.config.observe_branch,
300 &self.config.tag_type,
301 &ssh_key_secret,
302 );
303
304 let new_sha = match new_sha {
305 Ok(sha) => sha,
306 Err(e) => {
307 error!("Failed to get latest SHA: {:?}", e);
308 return State::Failure(format!("Failed to get latest SHA: {:#?}", e));
309 }
310 };
311
312 let deployment_path = format!("{}/{}", &manifest_repo_path, &self.config.deployment_path);
313
314 if needs_patching(&deployment_path, &new_sha).unwrap_or(false) {
315 match patch_deployment(&deployment_path, &self.config.image_name, &new_sha) {
316 Ok(_) => info!("File patched successfully for: {}", &self.name),
317 Err(e) => {
318 let _ = remove_dir_all(&manifest_repo_path);
319
320 if endpoint.is_some() {
321 let message = format!(
322 "Failed to patch deployment: {} to version: {}",
323 &self.name, &new_sha
324 );
325 match send_notification(&message, endpoint.as_deref()).await {
326 Ok(_) => info!("Notification sent successfully"),
327 Err(e) => {
328 warn!("Failed to send notification: {:?}", e);
329 }
330 }
331 }
332
333 error!("Failed to patch deployment: {:?}", e);
334 }
335 }
336
337 match commit_changes(&manifest_repo_path, &ssh_key_secret) {
338 Ok(_) => info!("Changes committed successfully"),
339 Err(e) => {
340 let _ = remove_dir_all(&manifest_repo_path);
341 error!(
342 "Failed to commit changes, cleaning up manifests repo for next run: {:?}",
343 e
344 );
345 }
346 }
347
348 if endpoint.is_some() {
349 let message = format!(
350 "Deployment {} has been patched successfully to version: {}",
351 &self.name, &new_sha
352 );
353 match send_notification(&message, endpoint.as_deref()).await {
354 Ok(_) => info!("Notification sent successfully"),
355 Err(e) => {
356 warn!("Failed to send notification: {:?}", e);
357 }
358 }
359 }
360
361 let message = format!(
362 "Deployment: {} patched successfully to version: {}",
363 &self.name, &new_sha
364 );
365 info!(message);
366
367 return State::Success(message);
368 } else {
369 info!("Checking image: {}", &self.config.image_name);
370 if registry_checker.is_ok()
371 && !registry_checker
372 .expect("Failed to create instance of registry checker")
373 .check_image(&self.config.image_name, &new_sha)
374 .await
375 .unwrap_or(false)
376 {
377 let message = format!(
378 ":probing_cane: image: https://hub.docker.com/repository/docker/{}/tags with SHA: {} not found in registry, it is likely still building...",
379 &self.config.image_name, &new_sha
380 );
381 if endpoint.is_some() {
382 match send_notification(&message, endpoint.as_deref()).await {
383 Ok(_) => info!("Notification sent successfully"),
384 Err(e) => {
385 warn!("Failed to send notification: {:?}", e);
386 }
387 }
388 }
389 error!("{}", message);
390 return State::Failure(message);
391 }
392
393 let message = format!(
394 "Deployment: {} is up to date, proceeding to next deployment...",
395 &self.name
396 );
397
398 info!(message);
399 return State::Success(message);
400 }
401 }
402
403 pub async fn reconcile(AxumState(store): AxumState<Cache>) -> Json<Vec<State>> {
404 tracing::info!("Starting reconciliation");
405
406 let data: Vec<_> = store.state().iter().filter_map(|d| Entry::new(d)).collect();
407
408 let mut handles: Vec<_> = vec![];
409
410 for entry in data {
411 if !entry.config.enabled {
412 warn!("Config is disabled for deplyment: {}", &entry.name);
413 continue;
414 }
415
416 let deployment = entry.process_deployment();
417
418 handles.push(deployment);
419 }
420
421 let results = future::join_all(handles).await;
422
423 Json(results)
424 }
425}