Skip to main content

pitchfork_cli/
deps.rs

1use crate::Result;
2use crate::daemon_id::DaemonId;
3use crate::error::{DependencyError, find_similar_daemon};
4use crate::pitchfork_toml::PitchforkTomlDaemon;
5use indexmap::IndexMap;
6use std::collections::{HashMap, HashSet, VecDeque};
7
8use crate::pitchfork_toml::PitchforkToml;
9
10/// Result of dependency resolution
11#[derive(Debug)]
12pub struct DependencyOrder {
13    /// Groups of daemons that can be started in parallel.
14    /// Each level depends only on daemons in previous levels.
15    pub levels: Vec<Vec<DaemonId>>,
16}
17
18/// Resolve dependency order using Kahn's algorithm (topological sort).
19///
20/// Returns daemons grouped into levels where:
21/// - Level 0: daemons with no dependencies (or deps already satisfied)
22/// - Level 1: daemons that only depend on level 0
23/// - Level N: daemons that only depend on levels 0..(N-1)
24///
25/// Daemons within the same level can be started in parallel.
26pub fn resolve_dependencies(
27    requested: &[DaemonId],
28    all_daemons: &IndexMap<DaemonId, PitchforkTomlDaemon>,
29) -> Result<DependencyOrder> {
30    // 1. Build the full set of daemons to start (requested + transitive deps)
31    let mut to_start: HashSet<DaemonId> = HashSet::new();
32    let mut queue: VecDeque<DaemonId> = requested.iter().cloned().collect();
33
34    while let Some(id) = queue.pop_front() {
35        if to_start.contains(&id) {
36            continue;
37        }
38
39        let daemon = all_daemons.get(&id).ok_or_else(|| {
40            let suggestion = find_similar_daemon(
41                &id.qualified(),
42                all_daemons
43                    .keys()
44                    .map(|k| k.qualified())
45                    .collect::<Vec<_>>()
46                    .iter()
47                    .map(|s| s.as_str()),
48            );
49            DependencyError::DaemonNotFound {
50                name: id.qualified(),
51                suggestion,
52            }
53        })?;
54
55        to_start.insert(id.clone());
56
57        // Add dependencies to queue
58        for dep in &daemon.depends {
59            if !all_daemons.contains_key(dep) {
60                return Err(DependencyError::MissingDependency {
61                    daemon: id.qualified(),
62                    dependency: dep.qualified(),
63                }
64                .into());
65            }
66            if !to_start.contains(dep) {
67                queue.push_back(dep.clone());
68            }
69        }
70    }
71
72    // 2. Build adjacency list and in-degree map
73    let mut in_degree: HashMap<DaemonId, usize> = HashMap::new();
74    let mut dependents: HashMap<DaemonId, Vec<DaemonId>> = HashMap::new();
75
76    for id in &to_start {
77        in_degree.entry(id.clone()).or_insert(0);
78        dependents.entry(id.clone()).or_default();
79    }
80
81    for id in &to_start {
82        let daemon = all_daemons.get(id).ok_or_else(|| {
83            miette::miette!("Internal error: daemon '{}' missing from configuration", id)
84        })?;
85        for dep in &daemon.depends {
86            if to_start.contains(dep) {
87                *in_degree.get_mut(id).ok_or_else(|| {
88                    miette::miette!("Internal error: in_degree missing for daemon '{}'", id)
89                })? += 1;
90                dependents
91                    .get_mut(dep)
92                    .ok_or_else(|| {
93                        miette::miette!("Internal error: dependents missing for daemon '{}'", dep)
94                    })?
95                    .push(id.clone());
96            }
97        }
98    }
99
100    // 3. Kahn's algorithm with level tracking
101    let mut processed: HashSet<DaemonId> = HashSet::new();
102    let mut levels: Vec<Vec<DaemonId>> = Vec::new();
103    let mut current_level: Vec<DaemonId> = in_degree
104        .iter()
105        .filter(|(_, deg)| **deg == 0)
106        .map(|(id, _)| id.clone())
107        .collect();
108
109    // Sort for deterministic order
110    current_level.sort();
111
112    while !current_level.is_empty() {
113        let mut next_level = Vec::new();
114
115        for id in &current_level {
116            processed.insert(id.clone());
117
118            let deps = dependents.get(id).ok_or_else(|| {
119                miette::miette!("Internal error: dependents missing for daemon '{}'", id)
120            })?;
121            for dependent in deps {
122                let deg = in_degree.get_mut(dependent).ok_or_else(|| {
123                    miette::miette!(
124                        "Internal error: in_degree missing for daemon '{}'",
125                        dependent
126                    )
127                })?;
128                *deg -= 1;
129                if *deg == 0 {
130                    next_level.push(dependent.clone());
131                }
132            }
133        }
134
135        levels.push(current_level);
136        next_level.sort(); // Sort for deterministic order
137        current_level = next_level;
138    }
139
140    // 4. Check for cycles
141    if processed.len() != to_start.len() {
142        let mut involved: Vec<_> = to_start
143            .difference(&processed)
144            .map(|id| id.qualified())
145            .collect();
146        involved.sort(); // Deterministic output
147        return Err(DependencyError::CircularDependency { involved }.into());
148    }
149
150    Ok(DependencyOrder { levels })
151}
152
153/// Compute the order in which daemons should be stopped, respecting
154/// reverse dependency order (dependents first, then their dependencies).
155///
156/// This is a shared helper used by both the supervisor's `close()` and
157/// the IPC `stop_daemons()` batch operation.
158///
159/// Returns a list of levels in reverse dependency order. Each level is a
160/// `Vec<DaemonId>` of daemons that can be stopped concurrently.
161/// Ad-hoc daemons (not in config) are placed in the first level.
162///
163/// Falls back to a single level containing all IDs if config loading
164/// or dependency resolution fails.
165pub fn compute_reverse_stop_order(active_ids: &[DaemonId]) -> Vec<Vec<DaemonId>> {
166    compute_reverse_stop_order_with_config(active_ids, None)
167}
168
169/// Like [`compute_reverse_stop_order`] but accepts a pre-loaded config to
170/// avoid redundant disk I/O when the caller already has one.
171pub fn compute_reverse_stop_order_with_config(
172    active_ids: &[DaemonId],
173    config: Option<&PitchforkToml>,
174) -> Vec<Vec<DaemonId>> {
175    if active_ids.is_empty() {
176        return Vec::new();
177    }
178
179    let owned_pt;
180    let pt = match config {
181        Some(pt) => pt,
182        None => match PitchforkToml::all_merged() {
183            Ok(loaded) => {
184                owned_pt = loaded;
185                &owned_pt
186            }
187            Err(e) => {
188                warn!(
189                    "failed to load config for dependency-ordered shutdown, stopping in arbitrary order: {e}"
190                );
191                return vec![active_ids.to_vec()];
192            }
193        },
194    };
195
196    let active_set: HashSet<&DaemonId> = active_ids.iter().collect();
197    let config_ids: Vec<DaemonId> = active_ids
198        .iter()
199        .filter(|id| pt.daemons.contains_key(*id))
200        .cloned()
201        .collect();
202    let adhoc_ids: Vec<DaemonId> = active_ids
203        .iter()
204        .filter(|id| !pt.daemons.contains_key(*id))
205        .cloned()
206        .collect();
207
208    if config_ids.is_empty() {
209        // All ad-hoc daemons, no dependency ordering needed
210        return vec![active_ids.to_vec()];
211    }
212
213    match resolve_dependencies(&config_ids, &pt.daemons) {
214        Ok(dep_order) => {
215            let mut levels: Vec<Vec<DaemonId>> = Vec::new();
216
217            // Stop ad-hoc daemons first (they have no dependency info)
218            if !adhoc_ids.is_empty() {
219                levels.push(adhoc_ids);
220            }
221
222            // Then stop config daemons in reverse dependency order
223            for level in dep_order.levels.into_iter().rev() {
224                let filtered: Vec<DaemonId> = level
225                    .into_iter()
226                    .filter(|id| active_set.contains(id))
227                    .collect();
228                if !filtered.is_empty() {
229                    levels.push(filtered);
230                }
231            }
232
233            debug!("shutdown order: {levels:?}");
234            levels
235        }
236        Err(e) => {
237            warn!("dependency resolution failed during shutdown, stopping in arbitrary order: {e}");
238            vec![active_ids.to_vec()]
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::daemon_id::DaemonId;
247    use crate::pitchfork_toml::PitchforkTomlDaemon;
248    use indexmap::IndexMap;
249
250    // Helper to build a test daemon with only `depends` set, all other fields default/None.
251    // Keeps tests concise while satisfying all required struct fields.
252
253    fn make_daemon(depends: Vec<&str>) -> PitchforkTomlDaemon {
254        PitchforkTomlDaemon {
255            run: "echo test".to_string(),
256            depends: depends
257                .into_iter()
258                .map(|s| DaemonId::new("global", s))
259                .collect(),
260            ..Default::default()
261        }
262    }
263
264    fn id(name: &str) -> DaemonId {
265        DaemonId::new("global", name)
266    }
267
268    #[test]
269    fn test_no_dependencies() {
270        let mut daemons = IndexMap::new();
271        daemons.insert(id("api"), make_daemon(vec![]));
272
273        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
274
275        assert_eq!(result.levels.len(), 1);
276        assert_eq!(result.levels[0], vec![id("api")]);
277    }
278
279    #[test]
280    fn test_simple_dependency() {
281        let mut daemons = IndexMap::new();
282        daemons.insert(id("postgres"), make_daemon(vec![]));
283        daemons.insert(id("api"), make_daemon(vec!["postgres"]));
284
285        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
286
287        assert_eq!(result.levels.len(), 2);
288        assert_eq!(result.levels[0], vec![id("postgres")]);
289        assert_eq!(result.levels[1], vec![id("api")]);
290    }
291
292    #[test]
293    fn test_multiple_dependencies() {
294        let mut daemons = IndexMap::new();
295        daemons.insert(id("postgres"), make_daemon(vec![]));
296        daemons.insert(id("redis"), make_daemon(vec![]));
297        daemons.insert(id("api"), make_daemon(vec!["postgres", "redis"]));
298
299        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
300
301        assert_eq!(result.levels.len(), 2);
302        // postgres and redis can start in parallel
303        assert!(result.levels[0].contains(&id("postgres")));
304        assert!(result.levels[0].contains(&id("redis")));
305        assert_eq!(result.levels[1], vec![id("api")]);
306    }
307
308    #[test]
309    fn test_transitive_dependencies() {
310        let mut daemons = IndexMap::new();
311        daemons.insert(id("database"), make_daemon(vec![]));
312        daemons.insert(id("backend"), make_daemon(vec!["database"]));
313        daemons.insert(id("api"), make_daemon(vec!["backend"]));
314
315        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
316
317        assert_eq!(result.levels.len(), 3);
318        assert_eq!(result.levels[0], vec![id("database")]);
319        assert_eq!(result.levels[1], vec![id("backend")]);
320        assert_eq!(result.levels[2], vec![id("api")]);
321    }
322
323    #[test]
324    fn test_diamond_dependency() {
325        let mut daemons = IndexMap::new();
326        daemons.insert(id("db"), make_daemon(vec![]));
327        daemons.insert(id("auth"), make_daemon(vec!["db"]));
328        daemons.insert(id("data"), make_daemon(vec!["db"]));
329        daemons.insert(id("api"), make_daemon(vec!["auth", "data"]));
330
331        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
332
333        assert_eq!(result.levels.len(), 3);
334        assert_eq!(result.levels[0], vec![id("db")]);
335        // auth and data can start in parallel
336        assert!(result.levels[1].contains(&id("auth")));
337        assert!(result.levels[1].contains(&id("data")));
338        assert_eq!(result.levels[2], vec![id("api")]);
339    }
340
341    #[test]
342    fn test_circular_dependency_detected() {
343        let mut daemons = IndexMap::new();
344        daemons.insert(id("a"), make_daemon(vec!["c"]));
345        daemons.insert(id("b"), make_daemon(vec!["a"]));
346        daemons.insert(id("c"), make_daemon(vec!["b"]));
347
348        let result = resolve_dependencies(&[id("a")], &daemons);
349
350        assert!(result.is_err());
351        let err = result.unwrap_err().to_string();
352        assert!(err.contains("circular dependency"));
353    }
354
355    #[test]
356    fn test_missing_dependency_error() {
357        let mut daemons = IndexMap::new();
358        let mut daemon = make_daemon(vec![]);
359        daemon.depends = vec![DaemonId::new("global", "nonexistent")];
360        daemons.insert(id("api"), daemon);
361
362        let result = resolve_dependencies(&[id("api")], &daemons);
363
364        assert!(result.is_err());
365        let err = result.unwrap_err().to_string();
366        assert!(err.contains("nonexistent"));
367        assert!(err.contains("not defined"));
368    }
369
370    #[test]
371    fn test_missing_requested_daemon_error() {
372        let daemons = IndexMap::new();
373
374        let result = resolve_dependencies(&[id("nonexistent")], &daemons);
375
376        assert!(result.is_err());
377        let err = result.unwrap_err().to_string();
378        assert!(err.contains("nonexistent"));
379        assert!(err.contains("not found"));
380    }
381
382    #[test]
383    fn test_multiple_requested_daemons() {
384        let mut daemons = IndexMap::new();
385        daemons.insert(id("db"), make_daemon(vec![]));
386        daemons.insert(id("api"), make_daemon(vec!["db"]));
387        daemons.insert(id("worker"), make_daemon(vec!["db"]));
388
389        let result = resolve_dependencies(&[id("api"), id("worker")], &daemons).unwrap();
390
391        assert_eq!(result.levels.len(), 2);
392        assert_eq!(result.levels[0], vec![id("db")]);
393        // api and worker can start in parallel
394        assert!(result.levels[1].contains(&id("api")));
395        assert!(result.levels[1].contains(&id("worker")));
396    }
397
398    #[test]
399    fn test_start_all_with_dependencies() {
400        let mut daemons = IndexMap::new();
401        daemons.insert(id("db"), make_daemon(vec![]));
402        daemons.insert(id("cache"), make_daemon(vec![]));
403        daemons.insert(id("api"), make_daemon(vec!["db", "cache"]));
404        daemons.insert(id("worker"), make_daemon(vec!["db"]));
405
406        let all_ids: Vec<DaemonId> = daemons.keys().cloned().collect();
407        let result = resolve_dependencies(&all_ids, &daemons).unwrap();
408
409        assert_eq!(result.levels.len(), 2);
410        // db and cache have no deps
411        assert!(result.levels[0].contains(&id("db")));
412        assert!(result.levels[0].contains(&id("cache")));
413        // api and worker depend on level 0
414        assert!(result.levels[1].contains(&id("api")));
415        assert!(result.levels[1].contains(&id("worker")));
416    }
417}