Skip to main content

planspec_server/controllers/
resolution.rs

1//! Plan resolution controller - determines which Plan satisfies a Goal.
2//!
3//! Resolution algorithm:
4//! 1. Find Plans where `spec.goalRef.name` matches the Goal
5//! 2. Filter by `planSelector` labels if specified on Goal
6//! 3. Group by `spec.series`
7//! 4. Within each series, select highest `spec.version`
8//! 5. Exclude Plans that are superseded by others
9//! 6. Set `Goal.status.activePlanRef` to selected Plan
10
11use anyhow::Result;
12use serde_json::{json, Value};
13use std::collections::{HashMap, HashSet};
14use tracing::{debug, info, warn};
15
16use crate::storage::Store;
17use crate::watch::WatchBroadcaster;
18
19/// Controller that resolves Plans for Goals
20pub struct PlanResolver {
21    store: Store,
22    broadcaster: WatchBroadcaster,
23}
24
25impl PlanResolver {
26    /// Create a new PlanResolver
27    pub fn new(store: Store, broadcaster: WatchBroadcaster) -> Self {
28        Self { store, broadcaster }
29    }
30
31    /// Run resolution for all Goals in a namespace
32    #[allow(dead_code)]
33    pub async fn reconcile_namespace(&self, namespace: &str) -> Result<()> {
34        debug!(namespace, "Reconciling goals in namespace");
35
36        // Get all Goals in namespace
37        let goals = self.store.list(namespace, "Goal").await?;
38        if goals.is_empty() {
39            return Ok(());
40        }
41
42        // Get all Plans in namespace
43        let plans = self.store.list(namespace, "Plan").await?;
44
45        for goal in goals {
46            if let Err(e) = self.reconcile_goal(&goal.object, &plans).await {
47                warn!(
48                    goal = goal.name,
49                    namespace,
50                    error = %e,
51                    "Failed to reconcile goal"
52                );
53            }
54        }
55
56        Ok(())
57    }
58
59    /// Run resolution for all Goals across all namespaces
60    pub async fn reconcile_all(&self) -> Result<()> {
61        debug!("Reconciling all goals");
62
63        // Get all Goals
64        let goals = self.store.list_all("Goal").await?;
65        if goals.is_empty() {
66            return Ok(());
67        }
68
69        // Get all Plans
70        let plans = self.store.list_all("Plan").await?;
71
72        // Group goals by namespace for efficiency
73        let mut goals_by_ns: HashMap<String, Vec<Value>> = HashMap::new();
74        for goal in goals {
75            goals_by_ns
76                .entry(goal.namespace.clone())
77                .or_default()
78                .push(goal.object);
79        }
80
81        // Group plans by namespace
82        let mut plans_by_ns: HashMap<String, Vec<Value>> = HashMap::new();
83        for plan in plans {
84            plans_by_ns
85                .entry(plan.namespace.clone())
86                .or_default()
87                .push(plan.object);
88        }
89
90        // Reconcile each namespace
91        for (ns, ns_goals) in goals_by_ns {
92            let ns_plans: Vec<_> = plans_by_ns
93                .get(&ns)
94                .map(|p| {
95                    p.iter()
96                        .map(|v| crate::storage::StoredObject {
97                            namespace: ns.clone(),
98                            kind: "Plan".to_string(),
99                            name: v
100                                .get("metadata")
101                                .and_then(|m| m.get("name"))
102                                .and_then(|n| n.as_str())
103                                .unwrap_or("")
104                                .to_string(),
105                            object: v.clone(),
106                            uid: String::new(),
107                            resource_version: 0,
108                            generation: 0,
109                            created_at: chrono::Utc::now(),
110                            updated_at: chrono::Utc::now(),
111                        })
112                        .collect()
113                })
114                .unwrap_or_default();
115
116            for goal in ns_goals {
117                if let Err(e) = self.reconcile_goal(&goal, &ns_plans).await {
118                    let goal_name = goal
119                        .get("metadata")
120                        .and_then(|m| m.get("name"))
121                        .and_then(|n| n.as_str())
122                        .unwrap_or("unknown");
123                    warn!(
124                        goal = goal_name,
125                        namespace = %ns,
126                        error = %e,
127                        "Failed to reconcile goal"
128                    );
129                }
130            }
131        }
132
133        Ok(())
134    }
135
136    /// Reconcile a single Goal - find and set its active Plan
137    async fn reconcile_goal(
138        &self,
139        goal: &Value,
140        plans: &[crate::storage::StoredObject],
141    ) -> Result<()> {
142        let goal_name = goal
143            .get("metadata")
144            .and_then(|m| m.get("name"))
145            .and_then(|n| n.as_str())
146            .ok_or_else(|| anyhow::anyhow!("Goal missing metadata.name"))?;
147
148        let namespace = goal
149            .get("metadata")
150            .and_then(|m| m.get("namespace"))
151            .and_then(|n| n.as_str())
152            .unwrap_or("default");
153
154        debug!(goal = goal_name, namespace, "Reconciling goal");
155
156        // Get current activePlanRef
157        let current_plan_ref = goal
158            .get("status")
159            .and_then(|s| s.get("activePlanRef"))
160            .and_then(|r| r.get("name"))
161            .and_then(|n| n.as_str());
162
163        // Step 1: Find Plans that reference this Goal
164        let matching_plans: Vec<_> = plans
165            .iter()
166            .filter(|p| {
167                let goal_ref = p
168                    .object
169                    .get("spec")
170                    .and_then(|s| s.get("goalRef"))
171                    .and_then(|gr| gr.get("name"))
172                    .and_then(|n| n.as_str());
173                goal_ref == Some(goal_name)
174            })
175            .collect();
176
177        if matching_plans.is_empty() {
178            debug!(goal = goal_name, "No plans found for goal");
179            // No plans found - set phase to Pending if not already set
180            if current_plan_ref.is_some() {
181                // Had a plan but now none - update status
182                self.update_goal_status(namespace, goal_name, None).await?;
183            }
184            return Ok(());
185        }
186
187        // Step 2: Filter by planSelector if specified
188        let plan_selector = goal.get("spec").and_then(|s| s.get("planSelector"));
189        let filtered_plans: Vec<_> = if let Some(selector) = plan_selector {
190            let match_labels = selector
191                .get("matchLabels")
192                .and_then(|m| m.as_object())
193                .map(|o| {
194                    o.iter()
195                        .map(|(k, v)| (k.as_str(), v.as_str().unwrap_or("")))
196                        .collect::<Vec<_>>()
197                })
198                .unwrap_or_default();
199
200            matching_plans
201                .into_iter()
202                .filter(|p| {
203                    let labels = p
204                        .object
205                        .get("metadata")
206                        .and_then(|m| m.get("labels"))
207                        .and_then(|l| l.as_object());
208
209                    match_labels.iter().all(|(k, v)| {
210                        labels.and_then(|l| l.get(*k)).and_then(|lv| lv.as_str()) == Some(*v)
211                    })
212                })
213                .collect()
214        } else {
215            matching_plans
216        };
217
218        if filtered_plans.is_empty() {
219            debug!(goal = goal_name, "No plans match selector for goal");
220            return Ok(());
221        }
222
223        // Step 3: Group by series
224        let mut by_series: HashMap<Option<String>, Vec<_>> = HashMap::new();
225        for plan in &filtered_plans {
226            let series = plan
227                .object
228                .get("spec")
229                .and_then(|s| s.get("series"))
230                .and_then(|s| s.as_str())
231                .map(|s| s.to_string());
232            by_series.entry(series).or_default().push(plan);
233        }
234
235        // Step 4: Within each series, select highest version
236        let mut candidates: Vec<&crate::storage::StoredObject> = Vec::new();
237        for (_series, series_plans) in by_series {
238            let best = series_plans.into_iter().max_by(|a, b| {
239                let v_a = a
240                    .object
241                    .get("spec")
242                    .and_then(|s| s.get("version"))
243                    .and_then(|v| v.as_str())
244                    .unwrap_or("0");
245                let v_b = b
246                    .object
247                    .get("spec")
248                    .and_then(|s| s.get("version"))
249                    .and_then(|v| v.as_str())
250                    .unwrap_or("0");
251                compare_versions(v_a, v_b)
252            });
253            if let Some(p) = best {
254                candidates.push(p);
255            }
256        }
257
258        // Step 5: Exclude superseded plans
259        let superseded: HashSet<String> = candidates
260            .iter()
261            .flat_map(|p| {
262                p.object
263                    .get("spec")
264                    .and_then(|s| s.get("supersedes"))
265                    .and_then(|ss| ss.as_array())
266                    .map(|arr| {
267                        arr.iter()
268                            .filter_map(|r| r.get("name").and_then(|n| n.as_str()))
269                            .map(|s| s.to_string())
270                            .collect::<Vec<_>>()
271                    })
272                    .unwrap_or_default()
273            })
274            .collect();
275
276        let final_candidates: Vec<_> = candidates
277            .into_iter()
278            .filter(|p| !superseded.contains(&p.name))
279            .collect();
280
281        // Select the best plan (highest priority/version among candidates)
282        let selected = final_candidates.into_iter().max_by(|a, b| {
283            let v_a = a
284                .object
285                .get("spec")
286                .and_then(|s| s.get("version"))
287                .and_then(|v| v.as_str())
288                .unwrap_or("0");
289            let v_b = b
290                .object
291                .get("spec")
292                .and_then(|s| s.get("version"))
293                .and_then(|v| v.as_str())
294                .unwrap_or("0");
295            compare_versions(v_a, v_b)
296        });
297
298        // Step 6: Update Goal.status.activePlanRef
299        if let Some(plan) = selected {
300            let plan_name = &plan.name;
301            if current_plan_ref != Some(plan_name.as_str()) {
302                info!(
303                    goal = goal_name,
304                    plan = %plan_name,
305                    namespace,
306                    "Resolved plan for goal"
307                );
308                self.update_goal_status(namespace, goal_name, Some(plan_name))
309                    .await?;
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Update Goal status with the resolved plan reference
317    async fn update_goal_status(
318        &self,
319        namespace: &str,
320        goal_name: &str,
321        plan_name: Option<&str>,
322    ) -> Result<()> {
323        // Get current goal
324        let goal = self
325            .store
326            .get(namespace, "Goal", goal_name)
327            .await?
328            .ok_or_else(|| anyhow::anyhow!("Goal not found"))?;
329
330        let mut updated = goal.object.clone();
331
332        // Get current generation for observedGeneration
333        let generation = updated
334            .get("metadata")
335            .and_then(|m| m.get("generation"))
336            .and_then(|g| g.as_i64())
337            .unwrap_or(1);
338
339        let now = chrono::Utc::now().to_rfc3339();
340
341        // Initialize status if not present
342        if updated.get("status").is_none() {
343            updated["status"] = json!({});
344        }
345
346        // Set observedGeneration
347        updated["status"]["observedGeneration"] = json!(generation);
348
349        // Get or create conditions array
350        let mut conditions = updated
351            .get("status")
352            .and_then(|s| s.get("conditions"))
353            .and_then(|c| c.as_array())
354            .cloned()
355            .unwrap_or_default();
356
357        // Set activePlanRef and phase
358        if let Some(plan) = plan_name {
359            updated["status"]["activePlanRef"] = json!({
360                "name": plan,
361                "namespace": namespace
362            });
363            updated["status"]["phase"] = json!("Ready");
364
365            // Update or add PlanResolved condition
366            update_condition(
367                &mut conditions,
368                "PlanResolved",
369                "True",
370                "PlanFound",
371                &format!("Plan '{}' resolved for goal", plan),
372                &now,
373                generation,
374            );
375        } else {
376            // Remove activePlanRef
377            if let Some(status) = updated.get_mut("status").and_then(|s| s.as_object_mut()) {
378                status.remove("activePlanRef");
379            }
380            updated["status"]["phase"] = json!("Pending");
381
382            // Update or add PlanResolved condition
383            update_condition(
384                &mut conditions,
385                "PlanResolved",
386                "False",
387                "NoPlanFound",
388                "No matching plan found for goal",
389                &now,
390                generation,
391            );
392        }
393
394        updated["status"]["conditions"] = json!(conditions);
395
396        // Update in store
397        let (_, event) = self
398            .store
399            .replace(namespace, "Goal", goal_name, updated, None)
400            .await?;
401
402        // Broadcast change
403        self.broadcaster.send(event);
404
405        Ok(())
406    }
407}
408
409/// Update or add a condition in the conditions array
410fn update_condition(
411    conditions: &mut Vec<Value>,
412    condition_type: &str,
413    status: &str,
414    reason: &str,
415    message: &str,
416    timestamp: &str,
417    generation: i64,
418) {
419    let new_condition = json!({
420        "type": condition_type,
421        "status": status,
422        "reason": reason,
423        "message": message,
424        "lastTransitionTime": timestamp,
425        "observedGeneration": generation
426    });
427
428    // Find existing condition of this type
429    if let Some(pos) = conditions
430        .iter()
431        .position(|c| c.get("type").and_then(|t| t.as_str()) == Some(condition_type))
432    {
433        // Check if status changed
434        let old_status = conditions[pos].get("status").and_then(|s| s.as_str());
435        if old_status != Some(status) {
436            // Status changed - update lastTransitionTime
437            conditions[pos] = new_condition;
438        } else {
439            // Status unchanged - only update message/reason/observedGeneration
440            conditions[pos]["reason"] = json!(reason);
441            conditions[pos]["message"] = json!(message);
442            conditions[pos]["observedGeneration"] = json!(generation);
443        }
444    } else {
445        // Add new condition
446        conditions.push(new_condition);
447    }
448}
449
450/// Compare version strings (semantic or numeric)
451fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
452    // Try numeric comparison first
453    if let (Ok(na), Ok(nb)) = (a.parse::<i64>(), b.parse::<i64>()) {
454        return na.cmp(&nb);
455    }
456
457    // Try semver-style comparison (split by .)
458    let parts_a: Vec<i64> = a.split('.').filter_map(|p| p.parse().ok()).collect();
459    let parts_b: Vec<i64> = b.split('.').filter_map(|p| p.parse().ok()).collect();
460
461    for (pa, pb) in parts_a.iter().zip(parts_b.iter()) {
462        match pa.cmp(pb) {
463            std::cmp::Ordering::Equal => continue,
464            other => return other,
465        }
466    }
467
468    // If all compared parts are equal, longer version is greater
469    parts_a.len().cmp(&parts_b.len())
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475
476    #[test]
477    fn test_compare_versions() {
478        assert_eq!(compare_versions("1", "2"), std::cmp::Ordering::Less);
479        assert_eq!(compare_versions("10", "2"), std::cmp::Ordering::Greater);
480        assert_eq!(compare_versions("1.0", "1.1"), std::cmp::Ordering::Less);
481        assert_eq!(
482            compare_versions("1.1.0", "1.1"),
483            std::cmp::Ordering::Greater
484        );
485        assert_eq!(compare_versions("2.0", "1.9"), std::cmp::Ordering::Greater);
486    }
487}