planspec_server/controllers/
resolution.rs1use anyhow::Result;
12use serde_json::{json, Value};
13use std::collections::{HashMap, HashSet};
14use tracing::{debug, info, warn};
15
16use crate::storage::Store;
17use crate::watch::WatchBroadcaster;
18
19pub struct PlanResolver {
21 store: Store,
22 broadcaster: WatchBroadcaster,
23}
24
25impl PlanResolver {
26 pub fn new(store: Store, broadcaster: WatchBroadcaster) -> Self {
28 Self { store, broadcaster }
29 }
30
31 #[allow(dead_code)]
33 pub async fn reconcile_namespace(&self, namespace: &str) -> Result<()> {
34 debug!(namespace, "Reconciling goals in namespace");
35
36 let goals = self.store.list(namespace, "Goal").await?;
38 if goals.is_empty() {
39 return Ok(());
40 }
41
42 let plans = self.store.list(namespace, "Plan").await?;
44
45 for goal in goals {
46 if let Err(e) = self.reconcile_goal(&goal.object, &plans).await {
47 warn!(
48 goal = goal.name,
49 namespace,
50 error = %e,
51 "Failed to reconcile goal"
52 );
53 }
54 }
55
56 Ok(())
57 }
58
59 pub async fn reconcile_all(&self) -> Result<()> {
61 debug!("Reconciling all goals");
62
63 let goals = self.store.list_all("Goal").await?;
65 if goals.is_empty() {
66 return Ok(());
67 }
68
69 let plans = self.store.list_all("Plan").await?;
71
72 let mut goals_by_ns: HashMap<String, Vec<Value>> = HashMap::new();
74 for goal in goals {
75 goals_by_ns
76 .entry(goal.namespace.clone())
77 .or_default()
78 .push(goal.object);
79 }
80
81 let mut plans_by_ns: HashMap<String, Vec<Value>> = HashMap::new();
83 for plan in plans {
84 plans_by_ns
85 .entry(plan.namespace.clone())
86 .or_default()
87 .push(plan.object);
88 }
89
90 for (ns, ns_goals) in goals_by_ns {
92 let ns_plans: Vec<_> = plans_by_ns
93 .get(&ns)
94 .map(|p| {
95 p.iter()
96 .map(|v| crate::storage::StoredObject {
97 namespace: ns.clone(),
98 kind: "Plan".to_string(),
99 name: v
100 .get("metadata")
101 .and_then(|m| m.get("name"))
102 .and_then(|n| n.as_str())
103 .unwrap_or("")
104 .to_string(),
105 object: v.clone(),
106 uid: String::new(),
107 resource_version: 0,
108 generation: 0,
109 created_at: chrono::Utc::now(),
110 updated_at: chrono::Utc::now(),
111 })
112 .collect()
113 })
114 .unwrap_or_default();
115
116 for goal in ns_goals {
117 if let Err(e) = self.reconcile_goal(&goal, &ns_plans).await {
118 let goal_name = goal
119 .get("metadata")
120 .and_then(|m| m.get("name"))
121 .and_then(|n| n.as_str())
122 .unwrap_or("unknown");
123 warn!(
124 goal = goal_name,
125 namespace = %ns,
126 error = %e,
127 "Failed to reconcile goal"
128 );
129 }
130 }
131 }
132
133 Ok(())
134 }
135
136 async fn reconcile_goal(
138 &self,
139 goal: &Value,
140 plans: &[crate::storage::StoredObject],
141 ) -> Result<()> {
142 let goal_name = goal
143 .get("metadata")
144 .and_then(|m| m.get("name"))
145 .and_then(|n| n.as_str())
146 .ok_or_else(|| anyhow::anyhow!("Goal missing metadata.name"))?;
147
148 let namespace = goal
149 .get("metadata")
150 .and_then(|m| m.get("namespace"))
151 .and_then(|n| n.as_str())
152 .unwrap_or("default");
153
154 debug!(goal = goal_name, namespace, "Reconciling goal");
155
156 let current_plan_ref = goal
158 .get("status")
159 .and_then(|s| s.get("activePlanRef"))
160 .and_then(|r| r.get("name"))
161 .and_then(|n| n.as_str());
162
163 let matching_plans: Vec<_> = plans
165 .iter()
166 .filter(|p| {
167 let goal_ref = p
168 .object
169 .get("spec")
170 .and_then(|s| s.get("goalRef"))
171 .and_then(|gr| gr.get("name"))
172 .and_then(|n| n.as_str());
173 goal_ref == Some(goal_name)
174 })
175 .collect();
176
177 if matching_plans.is_empty() {
178 debug!(goal = goal_name, "No plans found for goal");
179 if current_plan_ref.is_some() {
181 self.update_goal_status(namespace, goal_name, None).await?;
183 }
184 return Ok(());
185 }
186
187 let plan_selector = goal.get("spec").and_then(|s| s.get("planSelector"));
189 let filtered_plans: Vec<_> = if let Some(selector) = plan_selector {
190 let match_labels = selector
191 .get("matchLabels")
192 .and_then(|m| m.as_object())
193 .map(|o| {
194 o.iter()
195 .map(|(k, v)| (k.as_str(), v.as_str().unwrap_or("")))
196 .collect::<Vec<_>>()
197 })
198 .unwrap_or_default();
199
200 matching_plans
201 .into_iter()
202 .filter(|p| {
203 let labels = p
204 .object
205 .get("metadata")
206 .and_then(|m| m.get("labels"))
207 .and_then(|l| l.as_object());
208
209 match_labels.iter().all(|(k, v)| {
210 labels.and_then(|l| l.get(*k)).and_then(|lv| lv.as_str()) == Some(*v)
211 })
212 })
213 .collect()
214 } else {
215 matching_plans
216 };
217
218 if filtered_plans.is_empty() {
219 debug!(goal = goal_name, "No plans match selector for goal");
220 return Ok(());
221 }
222
223 let mut by_series: HashMap<Option<String>, Vec<_>> = HashMap::new();
225 for plan in &filtered_plans {
226 let series = plan
227 .object
228 .get("spec")
229 .and_then(|s| s.get("series"))
230 .and_then(|s| s.as_str())
231 .map(|s| s.to_string());
232 by_series.entry(series).or_default().push(plan);
233 }
234
235 let mut candidates: Vec<&crate::storage::StoredObject> = Vec::new();
237 for (_series, series_plans) in by_series {
238 let best = series_plans.into_iter().max_by(|a, b| {
239 let v_a = a
240 .object
241 .get("spec")
242 .and_then(|s| s.get("version"))
243 .and_then(|v| v.as_str())
244 .unwrap_or("0");
245 let v_b = b
246 .object
247 .get("spec")
248 .and_then(|s| s.get("version"))
249 .and_then(|v| v.as_str())
250 .unwrap_or("0");
251 compare_versions(v_a, v_b)
252 });
253 if let Some(p) = best {
254 candidates.push(p);
255 }
256 }
257
258 let superseded: HashSet<String> = candidates
260 .iter()
261 .flat_map(|p| {
262 p.object
263 .get("spec")
264 .and_then(|s| s.get("supersedes"))
265 .and_then(|ss| ss.as_array())
266 .map(|arr| {
267 arr.iter()
268 .filter_map(|r| r.get("name").and_then(|n| n.as_str()))
269 .map(|s| s.to_string())
270 .collect::<Vec<_>>()
271 })
272 .unwrap_or_default()
273 })
274 .collect();
275
276 let final_candidates: Vec<_> = candidates
277 .into_iter()
278 .filter(|p| !superseded.contains(&p.name))
279 .collect();
280
281 let selected = final_candidates.into_iter().max_by(|a, b| {
283 let v_a = a
284 .object
285 .get("spec")
286 .and_then(|s| s.get("version"))
287 .and_then(|v| v.as_str())
288 .unwrap_or("0");
289 let v_b = b
290 .object
291 .get("spec")
292 .and_then(|s| s.get("version"))
293 .and_then(|v| v.as_str())
294 .unwrap_or("0");
295 compare_versions(v_a, v_b)
296 });
297
298 if let Some(plan) = selected {
300 let plan_name = &plan.name;
301 if current_plan_ref != Some(plan_name.as_str()) {
302 info!(
303 goal = goal_name,
304 plan = %plan_name,
305 namespace,
306 "Resolved plan for goal"
307 );
308 self.update_goal_status(namespace, goal_name, Some(plan_name))
309 .await?;
310 }
311 }
312
313 Ok(())
314 }
315
316 async fn update_goal_status(
318 &self,
319 namespace: &str,
320 goal_name: &str,
321 plan_name: Option<&str>,
322 ) -> Result<()> {
323 let goal = self
325 .store
326 .get(namespace, "Goal", goal_name)
327 .await?
328 .ok_or_else(|| anyhow::anyhow!("Goal not found"))?;
329
330 let mut updated = goal.object.clone();
331
332 let generation = updated
334 .get("metadata")
335 .and_then(|m| m.get("generation"))
336 .and_then(|g| g.as_i64())
337 .unwrap_or(1);
338
339 let now = chrono::Utc::now().to_rfc3339();
340
341 if updated.get("status").is_none() {
343 updated["status"] = json!({});
344 }
345
346 updated["status"]["observedGeneration"] = json!(generation);
348
349 let mut conditions = updated
351 .get("status")
352 .and_then(|s| s.get("conditions"))
353 .and_then(|c| c.as_array())
354 .cloned()
355 .unwrap_or_default();
356
357 if let Some(plan) = plan_name {
359 updated["status"]["activePlanRef"] = json!({
360 "name": plan,
361 "namespace": namespace
362 });
363 updated["status"]["phase"] = json!("Ready");
364
365 update_condition(
367 &mut conditions,
368 "PlanResolved",
369 "True",
370 "PlanFound",
371 &format!("Plan '{}' resolved for goal", plan),
372 &now,
373 generation,
374 );
375 } else {
376 if let Some(status) = updated.get_mut("status").and_then(|s| s.as_object_mut()) {
378 status.remove("activePlanRef");
379 }
380 updated["status"]["phase"] = json!("Pending");
381
382 update_condition(
384 &mut conditions,
385 "PlanResolved",
386 "False",
387 "NoPlanFound",
388 "No matching plan found for goal",
389 &now,
390 generation,
391 );
392 }
393
394 updated["status"]["conditions"] = json!(conditions);
395
396 let (_, event) = self
398 .store
399 .replace(namespace, "Goal", goal_name, updated, None)
400 .await?;
401
402 self.broadcaster.send(event);
404
405 Ok(())
406 }
407}
408
409fn update_condition(
411 conditions: &mut Vec<Value>,
412 condition_type: &str,
413 status: &str,
414 reason: &str,
415 message: &str,
416 timestamp: &str,
417 generation: i64,
418) {
419 let new_condition = json!({
420 "type": condition_type,
421 "status": status,
422 "reason": reason,
423 "message": message,
424 "lastTransitionTime": timestamp,
425 "observedGeneration": generation
426 });
427
428 if let Some(pos) = conditions
430 .iter()
431 .position(|c| c.get("type").and_then(|t| t.as_str()) == Some(condition_type))
432 {
433 let old_status = conditions[pos].get("status").and_then(|s| s.as_str());
435 if old_status != Some(status) {
436 conditions[pos] = new_condition;
438 } else {
439 conditions[pos]["reason"] = json!(reason);
441 conditions[pos]["message"] = json!(message);
442 conditions[pos]["observedGeneration"] = json!(generation);
443 }
444 } else {
445 conditions.push(new_condition);
447 }
448}
449
450fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
452 if let (Ok(na), Ok(nb)) = (a.parse::<i64>(), b.parse::<i64>()) {
454 return na.cmp(&nb);
455 }
456
457 let parts_a: Vec<i64> = a.split('.').filter_map(|p| p.parse().ok()).collect();
459 let parts_b: Vec<i64> = b.split('.').filter_map(|p| p.parse().ok()).collect();
460
461 for (pa, pb) in parts_a.iter().zip(parts_b.iter()) {
462 match pa.cmp(pb) {
463 std::cmp::Ordering::Equal => continue,
464 other => return other,
465 }
466 }
467
468 parts_a.len().cmp(&parts_b.len())
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475
476 #[test]
477 fn test_compare_versions() {
478 assert_eq!(compare_versions("1", "2"), std::cmp::Ordering::Less);
479 assert_eq!(compare_versions("10", "2"), std::cmp::Ordering::Greater);
480 assert_eq!(compare_versions("1.0", "1.1"), std::cmp::Ordering::Less);
481 assert_eq!(
482 compare_versions("1.1.0", "1.1"),
483 std::cmp::Ordering::Greater
484 );
485 assert_eq!(compare_versions("2.0", "1.9"), std::cmp::Ordering::Greater);
486 }
487}