Skip to main content

pitchfork_cli/
deps.rs

1use crate::Result;
2use crate::daemon_id::DaemonId;
3use crate::error::{DependencyError, find_similar_daemon};
4use crate::pitchfork_toml::PitchforkTomlDaemon;
5use indexmap::IndexMap;
6use std::collections::{HashMap, HashSet, VecDeque};
7
8use crate::pitchfork_toml::PitchforkToml;
9
10/// Result of dependency resolution
11#[derive(Debug)]
12pub struct DependencyOrder {
13    /// Groups of daemons that can be started in parallel.
14    /// Each level depends only on daemons in previous levels.
15    pub levels: Vec<Vec<DaemonId>>,
16}
17
18/// Resolve dependency order using Kahn's algorithm (topological sort).
19///
20/// Returns daemons grouped into levels where:
21/// - Level 0: daemons with no dependencies (or deps already satisfied)
22/// - Level 1: daemons that only depend on level 0
23/// - Level N: daemons that only depend on levels 0..(N-1)
24///
25/// Daemons within the same level can be started in parallel.
26pub fn resolve_dependencies(
27    requested: &[DaemonId],
28    all_daemons: &IndexMap<DaemonId, PitchforkTomlDaemon>,
29) -> Result<DependencyOrder> {
30    // 1. Build the full set of daemons to start (requested + transitive deps)
31    let mut to_start: HashSet<DaemonId> = HashSet::new();
32    let mut queue: VecDeque<DaemonId> = requested.iter().cloned().collect();
33
34    while let Some(id) = queue.pop_front() {
35        if to_start.contains(&id) {
36            continue;
37        }
38
39        let daemon = all_daemons.get(&id).ok_or_else(|| {
40            let suggestion = find_similar_daemon(
41                &id.qualified(),
42                all_daemons
43                    .keys()
44                    .map(|k| k.qualified())
45                    .collect::<Vec<_>>()
46                    .iter()
47                    .map(|s| s.as_str()),
48            );
49            DependencyError::DaemonNotFound {
50                name: id.qualified(),
51                suggestion,
52            }
53        })?;
54
55        to_start.insert(id.clone());
56
57        // Add dependencies to queue
58        for dep in &daemon.depends {
59            if !all_daemons.contains_key(dep) {
60                return Err(DependencyError::MissingDependency {
61                    daemon: id.qualified(),
62                    dependency: dep.qualified(),
63                }
64                .into());
65            }
66            if !to_start.contains(dep) {
67                queue.push_back(dep.clone());
68            }
69        }
70    }
71
72    // 2. Build adjacency list and in-degree map
73    let mut in_degree: HashMap<DaemonId, usize> = HashMap::new();
74    let mut dependents: HashMap<DaemonId, Vec<DaemonId>> = HashMap::new();
75
76    for id in &to_start {
77        in_degree.entry(id.clone()).or_insert(0);
78        dependents.entry(id.clone()).or_default();
79    }
80
81    for id in &to_start {
82        let daemon = all_daemons.get(id).ok_or_else(|| {
83            miette::miette!("Internal error: daemon '{}' missing from configuration", id)
84        })?;
85        for dep in &daemon.depends {
86            if to_start.contains(dep) {
87                *in_degree.get_mut(id).ok_or_else(|| {
88                    miette::miette!("Internal error: in_degree missing for daemon '{}'", id)
89                })? += 1;
90                dependents
91                    .get_mut(dep)
92                    .ok_or_else(|| {
93                        miette::miette!("Internal error: dependents missing for daemon '{}'", dep)
94                    })?
95                    .push(id.clone());
96            }
97        }
98    }
99
100    // 3. Kahn's algorithm with level tracking
101    let mut processed: HashSet<DaemonId> = HashSet::new();
102    let mut levels: Vec<Vec<DaemonId>> = Vec::new();
103    let mut current_level: Vec<DaemonId> = in_degree
104        .iter()
105        .filter(|(_, deg)| **deg == 0)
106        .map(|(id, _)| id.clone())
107        .collect();
108
109    // Sort for deterministic order
110    current_level.sort();
111
112    while !current_level.is_empty() {
113        let mut next_level = Vec::new();
114
115        for id in &current_level {
116            processed.insert(id.clone());
117
118            let deps = dependents.get(id).ok_or_else(|| {
119                miette::miette!("Internal error: dependents missing for daemon '{}'", id)
120            })?;
121            for dependent in deps {
122                let deg = in_degree.get_mut(dependent).ok_or_else(|| {
123                    miette::miette!(
124                        "Internal error: in_degree missing for daemon '{}'",
125                        dependent
126                    )
127                })?;
128                *deg -= 1;
129                if *deg == 0 {
130                    next_level.push(dependent.clone());
131                }
132            }
133        }
134
135        levels.push(current_level);
136        next_level.sort(); // Sort for deterministic order
137        current_level = next_level;
138    }
139
140    // 4. Check for cycles
141    if processed.len() != to_start.len() {
142        let mut involved: Vec<_> = to_start
143            .difference(&processed)
144            .map(|id| id.qualified())
145            .collect();
146        involved.sort(); // Deterministic output
147        return Err(DependencyError::CircularDependency { involved }.into());
148    }
149
150    Ok(DependencyOrder { levels })
151}
152
153/// Compute the order in which daemons should be stopped, respecting
154/// reverse dependency order (dependents first, then their dependencies).
155///
156/// This is a shared helper used by both the supervisor's `close()` and
157/// the IPC `stop_daemons()` batch operation.
158///
159/// Returns a list of levels in reverse dependency order. Each level is a
160/// `Vec<DaemonId>` of daemons that can be stopped concurrently.
161/// Ad-hoc daemons (not in config) are placed in the first level.
162///
163/// Falls back to a single level containing all IDs if config loading
164/// or dependency resolution fails.
165pub fn compute_reverse_stop_order(active_ids: &[DaemonId]) -> Vec<Vec<DaemonId>> {
166    compute_reverse_stop_order_with_config(active_ids, None)
167}
168
169/// Like [`compute_reverse_stop_order`] but accepts a pre-loaded config to
170/// avoid redundant disk I/O when the caller already has one.
171pub fn compute_reverse_stop_order_with_config(
172    active_ids: &[DaemonId],
173    config: Option<&PitchforkToml>,
174) -> Vec<Vec<DaemonId>> {
175    if active_ids.is_empty() {
176        return Vec::new();
177    }
178
179    let owned_pt;
180    let pt = match config {
181        Some(pt) => pt,
182        None => match PitchforkToml::all_merged() {
183            Ok(loaded) => {
184                owned_pt = loaded;
185                &owned_pt
186            }
187            Err(e) => {
188                warn!(
189                    "failed to load config for dependency-ordered shutdown, stopping in arbitrary order: {e}"
190                );
191                return vec![active_ids.to_vec()];
192            }
193        },
194    };
195
196    let active_set: HashSet<&DaemonId> = active_ids.iter().collect();
197    let config_ids: Vec<DaemonId> = active_ids
198        .iter()
199        .filter(|id| pt.daemons.contains_key(*id))
200        .cloned()
201        .collect();
202    let adhoc_ids: Vec<DaemonId> = active_ids
203        .iter()
204        .filter(|id| !pt.daemons.contains_key(*id))
205        .cloned()
206        .collect();
207
208    if config_ids.is_empty() {
209        // All ad-hoc daemons, no dependency ordering needed
210        return vec![active_ids.to_vec()];
211    }
212
213    match resolve_dependencies(&config_ids, &pt.daemons) {
214        Ok(dep_order) => {
215            let mut levels: Vec<Vec<DaemonId>> = Vec::new();
216
217            // Stop ad-hoc daemons first (they have no dependency info)
218            if !adhoc_ids.is_empty() {
219                levels.push(adhoc_ids);
220            }
221
222            // Then stop config daemons in reverse dependency order
223            for level in dep_order.levels.into_iter().rev() {
224                let filtered: Vec<DaemonId> = level
225                    .into_iter()
226                    .filter(|id| active_set.contains(id))
227                    .collect();
228                if !filtered.is_empty() {
229                    levels.push(filtered);
230                }
231            }
232
233            debug!("shutdown order: {levels:?}");
234            levels
235        }
236        Err(e) => {
237            warn!("dependency resolution failed during shutdown, stopping in arbitrary order: {e}");
238            vec![active_ids.to_vec()]
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246    use crate::daemon_id::DaemonId;
247    use crate::pitchfork_toml::{PitchforkTomlDaemon, Retry};
248    use indexmap::IndexMap;
249
250    // Helper to build a test daemon with only `depends` set, all other fields default/None.
251    // Keeps tests concise while satisfying all required struct fields.
252
253    fn make_daemon(depends: Vec<&str>) -> PitchforkTomlDaemon {
254        PitchforkTomlDaemon {
255            run: "echo test".to_string(),
256            auto: vec![],
257            cron: None,
258            retry: Retry::default(),
259            ready_delay: None,
260            ready_output: None,
261            ready_http: None,
262            ready_port: None,
263            ready_cmd: None,
264            expected_port: Vec::new(),
265            auto_bump_port: false,
266            port_bump_attempts: 10,
267            boot_start: None,
268            depends: depends
269                .into_iter()
270                .map(|s| DaemonId::new("global", s))
271                .collect(),
272            watch: vec![],
273            dir: None,
274            env: None,
275            hooks: None,
276            path: None,
277            mise: None,
278            memory_limit: None,
279            cpu_limit: None,
280        }
281    }
282
283    fn id(name: &str) -> DaemonId {
284        DaemonId::new("global", name)
285    }
286
287    #[test]
288    fn test_no_dependencies() {
289        let mut daemons = IndexMap::new();
290        daemons.insert(id("api"), make_daemon(vec![]));
291
292        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
293
294        assert_eq!(result.levels.len(), 1);
295        assert_eq!(result.levels[0], vec![id("api")]);
296    }
297
298    #[test]
299    fn test_simple_dependency() {
300        let mut daemons = IndexMap::new();
301        daemons.insert(id("postgres"), make_daemon(vec![]));
302        daemons.insert(id("api"), make_daemon(vec!["postgres"]));
303
304        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
305
306        assert_eq!(result.levels.len(), 2);
307        assert_eq!(result.levels[0], vec![id("postgres")]);
308        assert_eq!(result.levels[1], vec![id("api")]);
309    }
310
311    #[test]
312    fn test_multiple_dependencies() {
313        let mut daemons = IndexMap::new();
314        daemons.insert(id("postgres"), make_daemon(vec![]));
315        daemons.insert(id("redis"), make_daemon(vec![]));
316        daemons.insert(id("api"), make_daemon(vec!["postgres", "redis"]));
317
318        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
319
320        assert_eq!(result.levels.len(), 2);
321        // postgres and redis can start in parallel
322        assert!(result.levels[0].contains(&id("postgres")));
323        assert!(result.levels[0].contains(&id("redis")));
324        assert_eq!(result.levels[1], vec![id("api")]);
325    }
326
327    #[test]
328    fn test_transitive_dependencies() {
329        let mut daemons = IndexMap::new();
330        daemons.insert(id("database"), make_daemon(vec![]));
331        daemons.insert(id("backend"), make_daemon(vec!["database"]));
332        daemons.insert(id("api"), make_daemon(vec!["backend"]));
333
334        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
335
336        assert_eq!(result.levels.len(), 3);
337        assert_eq!(result.levels[0], vec![id("database")]);
338        assert_eq!(result.levels[1], vec![id("backend")]);
339        assert_eq!(result.levels[2], vec![id("api")]);
340    }
341
342    #[test]
343    fn test_diamond_dependency() {
344        let mut daemons = IndexMap::new();
345        daemons.insert(id("db"), make_daemon(vec![]));
346        daemons.insert(id("auth"), make_daemon(vec!["db"]));
347        daemons.insert(id("data"), make_daemon(vec!["db"]));
348        daemons.insert(id("api"), make_daemon(vec!["auth", "data"]));
349
350        let result = resolve_dependencies(&[id("api")], &daemons).unwrap();
351
352        assert_eq!(result.levels.len(), 3);
353        assert_eq!(result.levels[0], vec![id("db")]);
354        // auth and data can start in parallel
355        assert!(result.levels[1].contains(&id("auth")));
356        assert!(result.levels[1].contains(&id("data")));
357        assert_eq!(result.levels[2], vec![id("api")]);
358    }
359
360    #[test]
361    fn test_circular_dependency_detected() {
362        let mut daemons = IndexMap::new();
363        daemons.insert(id("a"), make_daemon(vec!["c"]));
364        daemons.insert(id("b"), make_daemon(vec!["a"]));
365        daemons.insert(id("c"), make_daemon(vec!["b"]));
366
367        let result = resolve_dependencies(&[id("a")], &daemons);
368
369        assert!(result.is_err());
370        let err = result.unwrap_err().to_string();
371        assert!(err.contains("circular dependency"));
372    }
373
374    #[test]
375    fn test_missing_dependency_error() {
376        let mut daemons = IndexMap::new();
377        let mut daemon = make_daemon(vec![]);
378        daemon.depends = vec![DaemonId::new("global", "nonexistent")];
379        daemons.insert(id("api"), daemon);
380
381        let result = resolve_dependencies(&[id("api")], &daemons);
382
383        assert!(result.is_err());
384        let err = result.unwrap_err().to_string();
385        assert!(err.contains("nonexistent"));
386        assert!(err.contains("not defined"));
387    }
388
389    #[test]
390    fn test_missing_requested_daemon_error() {
391        let daemons = IndexMap::new();
392
393        let result = resolve_dependencies(&[id("nonexistent")], &daemons);
394
395        assert!(result.is_err());
396        let err = result.unwrap_err().to_string();
397        assert!(err.contains("nonexistent"));
398        assert!(err.contains("not found"));
399    }
400
401    #[test]
402    fn test_multiple_requested_daemons() {
403        let mut daemons = IndexMap::new();
404        daemons.insert(id("db"), make_daemon(vec![]));
405        daemons.insert(id("api"), make_daemon(vec!["db"]));
406        daemons.insert(id("worker"), make_daemon(vec!["db"]));
407
408        let result = resolve_dependencies(&[id("api"), id("worker")], &daemons).unwrap();
409
410        assert_eq!(result.levels.len(), 2);
411        assert_eq!(result.levels[0], vec![id("db")]);
412        // api and worker can start in parallel
413        assert!(result.levels[1].contains(&id("api")));
414        assert!(result.levels[1].contains(&id("worker")));
415    }
416
417    #[test]
418    fn test_start_all_with_dependencies() {
419        let mut daemons = IndexMap::new();
420        daemons.insert(id("db"), make_daemon(vec![]));
421        daemons.insert(id("cache"), make_daemon(vec![]));
422        daemons.insert(id("api"), make_daemon(vec!["db", "cache"]));
423        daemons.insert(id("worker"), make_daemon(vec!["db"]));
424
425        let all_ids: Vec<DaemonId> = daemons.keys().cloned().collect();
426        let result = resolve_dependencies(&all_ids, &daemons).unwrap();
427
428        assert_eq!(result.levels.len(), 2);
429        // db and cache have no deps
430        assert!(result.levels[0].contains(&id("db")));
431        assert!(result.levels[0].contains(&id("cache")));
432        // api and worker depend on level 0
433        assert!(result.levels[1].contains(&id("api")));
434        assert!(result.levels[1].contains(&id("worker")));
435    }
436}