k8s_operator_controller/
status.rs1use kube::api::{Patch, PatchParams, Resource};
2use kube::{Api, Client};
3use serde::de::DeserializeOwned;
4use serde::Serialize;
5use serde_json::json;
6use tracing::debug;
7
8use k8s_operator_core::OperatorError;
9
10pub async fn update_status<K, S>(
11 client: &Client,
12 resource: &K,
13 status: S,
14) -> k8s_operator_core::Result<()>
15where
16 K: Resource + Clone + std::fmt::Debug + Serialize + DeserializeOwned + Send + Sync + 'static,
17 K::DynamicType: Default,
18 S: Serialize,
19{
20 let name = resource.meta().name.clone().ok_or_else(|| {
21 OperatorError::Internal("Resource has no name".to_string())
22 })?;
23
24 let api: Api<K> = Api::all(client.clone());
25
26 let patch = json!({
27 "status": status
28 });
29
30 api.patch_status(&name, &PatchParams::apply("k8s-operator"), &Patch::Merge(&patch))
31 .await
32 .map_err(|e| OperatorError::KubeError(e))?;
33
34 debug!("Updated status for {}", name);
35 Ok(())
36}
37
38pub struct StatusPatch {
39 values: serde_json::Map<String, serde_json::Value>,
40}
41
42impl StatusPatch {
43 pub fn new() -> Self {
44 Self {
45 values: serde_json::Map::new(),
46 }
47 }
48
49 pub fn set<V: Serialize>(mut self, key: impl Into<String>, value: V) -> Self {
50 if let Ok(v) = serde_json::to_value(value) {
51 self.values.insert(key.into(), v);
52 }
53 self
54 }
55
56 pub fn condition(mut self, condition: StatusCondition) -> Self {
57 let conditions = self
58 .values
59 .entry("conditions".to_string())
60 .or_insert_with(|| serde_json::Value::Array(Vec::new()));
61
62 if let serde_json::Value::Array(ref mut arr) = conditions {
63 if let Ok(v) = serde_json::to_value(&condition) {
64 let existing_idx = arr.iter().position(|c| {
65 c.get("type")
66 .and_then(|t| t.as_str())
67 .map(|t| t == condition.type_)
68 .unwrap_or(false)
69 });
70
71 if let Some(idx) = existing_idx {
72 arr[idx] = v;
73 } else {
74 arr.push(v);
75 }
76 }
77 }
78
79 self
80 }
81
82 pub async fn apply<K>(self, client: &Client, resource: &K) -> k8s_operator_core::Result<()>
83 where
84 K: Resource + Clone + std::fmt::Debug + Serialize + DeserializeOwned + Send + Sync + 'static,
85 K::DynamicType: Default,
86 {
87 update_status(client, resource, serde_json::Value::Object(self.values)).await
88 }
89}
90
91impl Default for StatusPatch {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97#[derive(Debug, Clone, Serialize)]
98#[serde(rename_all = "camelCase")]
99pub struct StatusCondition {
100 #[serde(rename = "type")]
101 pub type_: String,
102 pub status: ConditionStatus,
103 #[serde(skip_serializing_if = "Option::is_none")]
104 pub reason: Option<String>,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub message: Option<String>,
107 pub last_transition_time: String,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub observed_generation: Option<i64>,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
113pub enum ConditionStatus {
114 True,
115 False,
116 Unknown,
117}
118
119impl StatusCondition {
120 pub fn new(type_: impl Into<String>, status: ConditionStatus) -> Self {
121 Self {
122 type_: type_.into(),
123 status,
124 reason: None,
125 message: None,
126 last_transition_time: chrono::Utc::now().to_rfc3339(),
127 observed_generation: None,
128 }
129 }
130
131 pub fn with_reason(mut self, reason: impl Into<String>) -> Self {
132 self.reason = Some(reason.into());
133 self
134 }
135
136 pub fn with_message(mut self, message: impl Into<String>) -> Self {
137 self.message = Some(message.into());
138 self
139 }
140
141 pub fn with_generation(mut self, generation: i64) -> Self {
142 self.observed_generation = Some(generation);
143 self
144 }
145
146 pub fn ready(ready: bool) -> Self {
147 Self::new(
148 "Ready",
149 if ready {
150 ConditionStatus::True
151 } else {
152 ConditionStatus::False
153 },
154 )
155 }
156
157 pub fn available(available: bool) -> Self {
158 Self::new(
159 "Available",
160 if available {
161 ConditionStatus::True
162 } else {
163 ConditionStatus::False
164 },
165 )
166 }
167}