1#![allow(clippy::missing_errors_doc)] #![allow(clippy::must_use_candidate)] #![allow(clippy::return_self_not_must_use)] #![allow(clippy::type_complexity)] #[cfg(feature = "derive")]
35#[doc(inline)]
36pub use kube_condition_derive::StatusCondition;
37
38use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition;
39use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
40use k8s_openapi::chrono::Utc;
41use kube::api::{Api, Patch, PatchParams};
42use kube::runtime::controller::Action;
43use kube::{Resource, ResourceExt};
44use serde::de::DeserializeOwned;
45use serde::Serialize;
46use std::fmt::Debug;
47use std::sync::Arc;
48use std::time::Duration;
49use tracing::{debug, error, info, warn};
50
51pub const CONDITION_TYPE_READY: &str = "Ready";
54pub const CONDITION_TYPE_PROGRESSING: &str = "Progressing";
56pub const CONDITION_TYPE_DEGRADED: &str = "Degraded";
58pub const CONDITION_TYPE_AVAILABLE: &str = "Available";
60
61pub const CONDITION_STATUS_TRUE: &str = "True";
64pub const CONDITION_STATUS_FALSE: &str = "False";
66pub const CONDITION_STATUS_UNKNOWN: &str = "Unknown";
68
69pub const CONDITION_REASON_RECONCILE_SUCCEEDED: &str = "ReconcileSucceeded";
72pub const CONDITION_REASON_PROGRESSING: &str = "Progressing";
74
75pub const DEFAULT_REQUEUE_SECONDS: u64 = 30;
78pub const DEFAULT_NON_RETRYABLE_REQUEUE_SECONDS: u64 = 300;
80
81pub const FIELD_MANAGER_NAME: &str = "kube-condition";
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum Severity {
88 Info,
89 Warning,
90 Error,
91}
92
93impl Severity {
94 pub fn as_str(&self) -> &'static str {
95 match self {
96 Severity::Info => "Info",
97 Severity::Warning => "Warning",
98 Severity::Error => "Error",
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct ConditionInfo {
106 pub type_: String,
107 pub status: String,
108 pub reason: String,
109 pub message: String,
110}
111
112impl ConditionInfo {
113 pub fn into_condition(self, observed_generation: Option<i64>) -> Condition {
115 Condition {
116 type_: self.type_,
117 status: self.status,
118 reason: self.reason,
119 message: self.message,
120 last_transition_time: Time(Utc::now()),
121 observed_generation,
122 }
123 }
124}
125
126pub trait StatusCondition: std::error::Error {
130 fn to_condition_info(&self) -> ConditionInfo;
132
133 fn severity(&self) -> Severity;
135
136 fn is_retryable(&self) -> bool;
138
139 fn requeue_duration(&self) -> Duration;
141
142 fn to_condition(&self, observed_generation: Option<i64>) -> Condition {
144 self.to_condition_info().into_condition(observed_generation)
145 }
146
147 fn to_action(&self) -> Action {
149 if self.is_retryable() {
150 Action::requeue(self.requeue_duration())
151 } else {
152 Action::requeue(Duration::from_secs(DEFAULT_NON_RETRYABLE_REQUEUE_SECONDS))
154 }
155 }
156}
157
158pub fn ready_condition(ready: bool, message: impl Into<String>) -> ConditionInfo {
172 ConditionInfo {
173 type_: CONDITION_TYPE_READY.to_string(),
174 status: if ready { CONDITION_STATUS_TRUE } else { CONDITION_STATUS_FALSE }.to_string(),
175 reason: if ready { CONDITION_REASON_RECONCILE_SUCCEEDED } else { "NotReady" }.to_string(),
176 message: message.into(),
177 }
178}
179
180pub fn error_condition(reason: impl Into<String>, message: impl Into<String>) -> ConditionInfo {
193 ConditionInfo {
194 type_: CONDITION_TYPE_READY.to_string(),
195 status: CONDITION_STATUS_FALSE.to_string(),
196 reason: reason.into(),
197 message: message.into(),
198 }
199}
200
201#[derive(Debug, Clone)]
203pub struct ReadyCondition;
204
205impl ReadyCondition {
206 pub fn success(message: impl Into<String>) -> ConditionInfo {
207 ConditionInfo {
208 type_: CONDITION_TYPE_READY.to_string(),
209 status: CONDITION_STATUS_TRUE.to_string(),
210 reason: CONDITION_REASON_RECONCILE_SUCCEEDED.to_string(),
211 message: message.into(),
212 }
213 }
214
215 pub fn progressing(message: impl Into<String>) -> ConditionInfo {
216 ConditionInfo {
217 type_: CONDITION_TYPE_READY.to_string(),
218 status: CONDITION_STATUS_FALSE.to_string(),
219 reason: CONDITION_REASON_PROGRESSING.to_string(),
220 message: message.into(),
221 }
222 }
223}
224
225#[derive(Debug, Clone)]
227pub struct ConditionBuilder {
228 type_: String,
229 status: String,
230 reason: String,
231 message: String,
232}
233
234impl ConditionBuilder {
235 pub fn new(type_: impl Into<String>) -> Self {
236 Self {
237 type_: type_.into(),
238 status: CONDITION_STATUS_UNKNOWN.to_string(),
239 reason: "Unknown".to_string(),
240 message: String::new(),
241 }
242 }
243
244 pub fn status(mut self, status: impl Into<String>) -> Self {
245 self.status = status.into();
246 self
247 }
248
249 pub fn success(self) -> Self {
250 self.status(CONDITION_STATUS_TRUE)
251 }
252
253 pub fn failure(self) -> Self {
254 self.status(CONDITION_STATUS_FALSE)
255 }
256
257 pub fn reason(mut self, reason: impl Into<String>) -> Self {
258 self.reason = reason.into();
259 self
260 }
261
262 pub fn message(mut self, message: impl Into<String>) -> Self {
263 self.message = message.into();
264 self
265 }
266
267 pub fn build(self, observed_generation: Option<i64>) -> Condition {
268 Condition {
269 type_: self.type_,
270 status: self.status,
271 reason: self.reason,
272 message: self.message,
273 last_transition_time: Time(Utc::now()),
274 observed_generation,
275 }
276 }
277}
278
279#[async_trait::async_trait]
281pub trait ConditionExt: Resource + Clone + Debug + Send + Sync + 'static
282where
283 Self::DynamicType: Default,
284{
285 async fn update_condition(
287 &self,
288 client: &kube::Client,
289 condition: Condition,
290 ) -> Result<(), kube::Error>;
291
292 async fn update_conditions(
294 &self,
295 client: &kube::Client,
296 conditions: Vec<Condition>,
297 ) -> Result<(), kube::Error>;
298
299 async fn set_ready(
301 &self,
302 client: &kube::Client,
303 message: impl Into<String> + Send,
304 ) -> Result<(), kube::Error> {
305 let generation = self.meta().generation;
306 let condition = ReadyCondition::success(message).into_condition(generation);
307 self.update_condition(client, condition).await
308 }
309
310 async fn set_error<E: StatusCondition + Send + Sync>(
312 &self,
313 client: &kube::Client,
314 error: &E,
315 ) -> Result<(), kube::Error> {
316 let generation = self.meta().generation;
317 let condition = error.to_condition(generation);
318 self.update_condition(client, condition).await
319 }
320}
321
322#[async_trait::async_trait]
324impl<T> ConditionExt for T
325where
326 T: Resource<Scope = kube::core::NamespaceResourceScope> + Clone + Debug + Send + Sync + 'static,
327 T::DynamicType: Default,
328 T: Serialize + DeserializeOwned,
329{
330 async fn update_condition(
331 &self,
332 client: &kube::Client,
333 condition: Condition,
334 ) -> Result<(), kube::Error> {
335 self.update_conditions(client, vec![condition]).await
336 }
337
338 async fn update_conditions(
339 &self,
340 client: &kube::Client,
341 conditions: Vec<Condition>,
342 ) -> Result<(), kube::Error> {
343 let name = self.name_any();
344 let namespace = self.namespace().ok_or_else(|| {
345 kube::Error::Api(kube::error::ErrorResponse {
346 status: "Failure".to_string(),
347 message: "Resource must be namespaced".to_string(),
348 reason: "MissingNamespace".to_string(),
349 code: 400,
350 })
351 })?;
352
353 let api: Api<T> = Api::namespaced(client.clone(), &namespace);
354
355 let patch = serde_json::json!({
357 "status": {
358 "conditions": conditions
359 }
360 });
361
362 let pp = PatchParams::apply(FIELD_MANAGER_NAME).force();
363
364 debug!(
365 resource = %name,
366 namespace = %namespace,
367 conditions = ?conditions.iter().map(|c| &c.type_).collect::<Vec<_>>(),
368 "Updating status conditions"
369 );
370
371 api.patch_status(&name, &pp, &Patch::Merge(&patch)).await?;
372
373 Ok(())
374 }
375}
376
377pub struct ReconcileContext<T> {
379 pub client: kube::Client,
380 _phantom: std::marker::PhantomData<T>,
381}
382
383impl<T> ReconcileContext<T> {
384 pub fn new(client: kube::Client) -> Self {
385 Self { client, _phantom: std::marker::PhantomData }
386 }
387}
388
389pub trait ReconcileExt<T, E>
391where
392 T: Resource + Clone + Debug + Send + Sync + 'static + Serialize + DeserializeOwned,
393 T::DynamicType: Default,
394 E: StatusCondition + Send + Sync,
395{
396 fn with_status_update<F, Fut>(
398 reconcile_fn: F,
399 ) -> impl Fn(
400 Arc<T>,
401 Arc<ReconcileContext<T>>,
402 )
403 -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Action, E>> + Send>>
404 where
405 F: Fn(Arc<T>, Arc<ReconcileContext<T>>) -> Fut + Send + Sync + Clone + 'static,
406 Fut: std::future::Future<Output = Result<Action, E>> + Send + 'static;
407}
408
409pub async fn reconcile_with_status<T, E, F, Fut>(
411 obj: Arc<T>,
412 ctx: Arc<ReconcileContext<T>>,
413 reconcile_fn: F,
414) -> Result<Action, E>
415where
416 T: Resource<Scope = kube::core::NamespaceResourceScope>
417 + Clone
418 + Debug
419 + Send
420 + Sync
421 + 'static
422 + Serialize
423 + DeserializeOwned,
424 T::DynamicType: Default,
425 E: StatusCondition + Send + Sync,
426 F: Fn(Arc<T>, Arc<ReconcileContext<T>>) -> Fut,
427 Fut: std::future::Future<Output = Result<Action, E>>,
428{
429 let name = obj.name_any();
430 let generation = obj.meta().generation;
431
432 match reconcile_fn(obj.clone(), ctx.clone()).await {
433 Ok(action) => {
434 let condition =
436 ReadyCondition::success("Reconciliation succeeded").into_condition(generation);
437
438 if let Err(e) = obj.update_condition(&ctx.client, condition).await {
439 error!(resource = %name, error = %e, "Failed to update success status");
440 } else {
441 info!(resource = %name, "Reconciliation succeeded");
442 }
443
444 Ok(action)
445 }
446 Err(e) => {
447 let condition = e.to_condition(generation);
449 let severity = e.severity();
450 let action = e.to_action();
451
452 match severity {
453 Severity::Info => info!(resource = %name, error = %e, "Reconciliation info"),
454 Severity::Warning => warn!(resource = %name, error = %e, "Reconciliation warning"),
455 Severity::Error => error!(resource = %name, error = %e, "Reconciliation failed"),
456 }
457
458 if let Err(status_err) = obj.update_condition(&ctx.client, condition).await {
459 error!(
460 resource = %name,
461 original_error = %e,
462 status_error = %status_err,
463 "Failed to update error status"
464 );
465 }
466
467 Ok(action)
470 }
471 }
472}
473
474#[macro_export]
476macro_rules! reconcile_with_status {
477 ($reconcile_fn:expr) => {
478 |obj, ctx| Box::pin($crate::reconcile_with_status(obj, ctx, $reconcile_fn))
479 };
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
488 fn test_condition_builder_success() {
489 let condition = ConditionBuilder::new("Synchronized")
490 .success()
491 .reason("ZoneUpdated")
492 .message("Zone file synchronized successfully")
493 .build(Some(5));
494
495 assert_eq!(condition.type_, "Synchronized");
496 assert_eq!(condition.status, CONDITION_STATUS_TRUE);
497 assert_eq!(condition.reason, "ZoneUpdated");
498 assert_eq!(condition.message, "Zone file synchronized successfully");
499 assert_eq!(condition.observed_generation, Some(5));
500 }
501
502 #[test]
503 fn test_condition_builder_failure() {
504 let condition = ConditionBuilder::new("Ready")
505 .failure()
506 .reason("ValidationFailed")
507 .message("Resource validation failed")
508 .build(None);
509
510 assert_eq!(condition.type_, CONDITION_TYPE_READY);
511 assert_eq!(condition.status, CONDITION_STATUS_FALSE);
512 assert_eq!(condition.reason, "ValidationFailed");
513 assert_eq!(condition.message, "Resource validation failed");
514 assert_eq!(condition.observed_generation, None);
515 }
516
517 #[test]
518 fn test_condition_builder_custom_status() {
519 let condition = ConditionBuilder::new("Custom")
520 .status("Unknown")
521 .reason("Investigating")
522 .message("Status unknown")
523 .build(Some(1));
524
525 assert_eq!(condition.type_, "Custom");
526 assert_eq!(condition.status, CONDITION_STATUS_UNKNOWN);
527 assert_eq!(condition.reason, "Investigating");
528 }
529
530 #[test]
532 fn test_ready_condition_success() {
533 let info = ReadyCondition::success("All good");
534 assert_eq!(info.type_, CONDITION_TYPE_READY);
535 assert_eq!(info.status, CONDITION_STATUS_TRUE);
536 assert_eq!(info.reason, CONDITION_REASON_RECONCILE_SUCCEEDED);
537 assert_eq!(info.message, "All good");
538 }
539
540 #[test]
541 fn test_ready_condition_progressing() {
542 let info = ReadyCondition::progressing("Initializing resources");
543 assert_eq!(info.type_, CONDITION_TYPE_READY);
544 assert_eq!(info.status, CONDITION_STATUS_FALSE);
545 assert_eq!(info.reason, CONDITION_REASON_PROGRESSING);
546 assert_eq!(info.message, "Initializing resources");
547 }
548
549 #[test]
551 fn test_ready_condition_helper_true() {
552 let info = ready_condition(true, "System operational");
553 assert_eq!(info.type_, CONDITION_TYPE_READY);
554 assert_eq!(info.status, CONDITION_STATUS_TRUE);
555 assert_eq!(info.reason, CONDITION_REASON_RECONCILE_SUCCEEDED);
556 assert_eq!(info.message, "System operational");
557 }
558
559 #[test]
560 fn test_ready_condition_helper_false() {
561 let info = ready_condition(false, "System not ready");
562 assert_eq!(info.type_, CONDITION_TYPE_READY);
563 assert_eq!(info.status, CONDITION_STATUS_FALSE);
564 assert_eq!(info.reason, "NotReady");
565 assert_eq!(info.message, "System not ready");
566 }
567
568 #[test]
569 fn test_error_condition_helper() {
570 let info = error_condition("DatabaseConnectionFailed", "Unable to connect to database");
571 assert_eq!(info.type_, CONDITION_TYPE_READY);
572 assert_eq!(info.status, CONDITION_STATUS_FALSE);
573 assert_eq!(info.reason, "DatabaseConnectionFailed");
574 assert_eq!(info.message, "Unable to connect to database");
575 }
576
577 #[test]
579 fn test_condition_info_into_condition_with_generation() {
580 let info = ConditionInfo {
581 type_: CONDITION_TYPE_READY.to_string(),
582 status: CONDITION_STATUS_TRUE.to_string(),
583 reason: CONDITION_REASON_RECONCILE_SUCCEEDED.to_string(),
584 message: "Test message".to_string(),
585 };
586
587 let condition = info.into_condition(Some(42));
588 assert_eq!(condition.type_, CONDITION_TYPE_READY);
589 assert_eq!(condition.status, CONDITION_STATUS_TRUE);
590 assert_eq!(condition.reason, CONDITION_REASON_RECONCILE_SUCCEEDED);
591 assert_eq!(condition.message, "Test message");
592 assert_eq!(condition.observed_generation, Some(42));
593 }
594
595 #[test]
596 fn test_condition_info_into_condition_without_generation() {
597 let info = ConditionInfo {
598 type_: CONDITION_TYPE_AVAILABLE.to_string(),
599 status: CONDITION_STATUS_FALSE.to_string(),
600 reason: "Unavailable".to_string(),
601 message: "Service unavailable".to_string(),
602 };
603
604 let condition = info.into_condition(None);
605 assert_eq!(condition.type_, CONDITION_TYPE_AVAILABLE);
606 assert_eq!(condition.observed_generation, None);
607 }
608
609 #[test]
611 fn test_severity_as_str() {
612 assert_eq!(Severity::Info.as_str(), "Info");
613 assert_eq!(Severity::Warning.as_str(), "Warning");
614 assert_eq!(Severity::Error.as_str(), "Error");
615 }
616
617 #[test]
618 fn test_severity_equality() {
619 assert_eq!(Severity::Info, Severity::Info);
620 assert_eq!(Severity::Warning, Severity::Warning);
621 assert_eq!(Severity::Error, Severity::Error);
622 assert_ne!(Severity::Info, Severity::Warning);
623 assert_ne!(Severity::Warning, Severity::Error);
624 }
625
626 #[test]
628 fn test_condition_type_constants() {
629 assert_eq!(CONDITION_TYPE_READY, "Ready");
630 assert_eq!(CONDITION_TYPE_PROGRESSING, "Progressing");
631 assert_eq!(CONDITION_TYPE_DEGRADED, "Degraded");
632 assert_eq!(CONDITION_TYPE_AVAILABLE, "Available");
633 }
634
635 #[test]
636 fn test_condition_status_constants() {
637 assert_eq!(CONDITION_STATUS_TRUE, "True");
638 assert_eq!(CONDITION_STATUS_FALSE, "False");
639 assert_eq!(CONDITION_STATUS_UNKNOWN, "Unknown");
640 }
641
642 #[test]
643 fn test_condition_reason_constants() {
644 assert_eq!(CONDITION_REASON_RECONCILE_SUCCEEDED, "ReconcileSucceeded");
645 assert_eq!(CONDITION_REASON_PROGRESSING, "Progressing");
646 }
647
648 #[test]
649 fn test_requeue_duration_constants() {
650 assert_eq!(DEFAULT_REQUEUE_SECONDS, 30);
651 assert_eq!(DEFAULT_NON_RETRYABLE_REQUEUE_SECONDS, 300);
652 }
653
654 #[test]
655 fn test_field_manager_constant() {
656 assert_eq!(FIELD_MANAGER_NAME, "kube-condition");
657 }
658
659 #[derive(Debug)]
661 struct MockError {
662 retryable: bool,
663 requeue_secs: u64,
664 }
665
666 impl std::fmt::Display for MockError {
667 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 write!(f, "Mock error")
669 }
670 }
671
672 impl std::error::Error for MockError {}
673
674 impl StatusCondition for MockError {
675 fn to_condition_info(&self) -> ConditionInfo {
676 ConditionInfo {
677 type_: CONDITION_TYPE_READY.to_string(),
678 status: CONDITION_STATUS_FALSE.to_string(),
679 reason: "MockError".to_string(),
680 message: "Mock error occurred".to_string(),
681 }
682 }
683
684 fn severity(&self) -> Severity {
685 Severity::Error
686 }
687
688 fn is_retryable(&self) -> bool {
689 self.retryable
690 }
691
692 fn requeue_duration(&self) -> Duration {
693 Duration::from_secs(self.requeue_secs)
694 }
695 }
696
697 #[test]
698 fn test_status_condition_to_condition() {
699 let error = MockError { retryable: true, requeue_secs: 60 };
700
701 let condition = error.to_condition(Some(10));
702 assert_eq!(condition.type_, CONDITION_TYPE_READY);
703 assert_eq!(condition.status, CONDITION_STATUS_FALSE);
704 assert_eq!(condition.reason, "MockError");
705 assert_eq!(condition.message, "Mock error occurred");
706 assert_eq!(condition.observed_generation, Some(10));
707 }
708
709 #[test]
710 fn test_status_condition_to_action_retryable() {
711 let error = MockError { retryable: true, requeue_secs: 60 };
712
713 let action = error.to_action();
714 assert!(error.is_retryable());
717 assert_eq!(error.requeue_duration(), Duration::from_secs(60));
718 let _ = action;
720 }
721
722 #[test]
723 fn test_status_condition_to_action_non_retryable() {
724 let error = MockError {
725 retryable: false,
726 requeue_secs: 60, };
728
729 let action = error.to_action();
730 assert!(!error.is_retryable());
732 let _ = action;
734 }
735}