Skip to main content

consortium_nix/
lib.rs

1//! # consortium-nix
2//!
3//! NixOS deployment orchestration for consortium.
4//!
5//! This crate provides the deployment pipeline for NixOS and nix-darwin systems,
6//! replacing colmena. It consumes fleet configuration (produced by the Nix library)
7//! and orchestrates the evaluate → build → copy → activate pipeline using
8//! consortium's DAG executor for maximum parallelism with per-host pipelining.
9//!
10//! ## Architecture
11//!
12//! The deployment pipeline has four stages, executed as a DAG:
13//!
14//! ```text
15//! For each host:
16//!   eval(host) → build(host) → copy(host) → activate(host)
17//! ```
18//!
19//! Stages run in parallel across hosts (up to concurrency limits), and each
20//! host can advance independently — host A can be copying while host B is
21//! still building.
22//!
23//! Builder health checking ([`health`]) validates remote builders before use.
24
25pub mod activate;
26pub mod build;
27pub mod cascade;
28pub mod cascade_events;
29pub mod cascade_executor;
30pub mod cascade_integration;
31pub mod cascade_strategies;
32pub mod cascade_trace;
33pub mod config;
34pub mod copy;
35pub mod error;
36pub mod eval;
37pub mod health;
38pub mod tasks;
39
40pub use config::{DeployAction, DeploymentNode, DeploymentPlan, FleetConfig, ProfileType};
41pub use error::{NixError, Result};
42
43use consortium::dag::{DagContext, DagReport, ErrorPolicy, StageBuilder, TaskId};
44
45use crate::cascade_events::EventSink;
46use crate::cascade_integration::{cascade_copy_grouped, CascadeCopyConfig, CascadeCopyTarget};
47
48/// Run the full deployment pipeline using the DAG executor.
49///
50/// Each host progresses independently through eval → build → copy → activate,
51/// with per-stage concurrency limits. A host that fails at any stage is
52/// cancelled for subsequent stages without blocking other hosts.
53pub fn deploy(
54    config: &FleetConfig,
55    target_nodes: &[String],
56    action: DeployAction,
57    max_parallel: usize,
58    use_builders: bool,
59) -> Result<DeployReport> {
60    // Phase 0: Health check builders and prepare machines file
61    let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
62        let statuses = health::check_builders(config);
63        let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
64        if healthy.is_empty() {
65            eprintln!("warning: no healthy builders available, building locally");
66            None
67        } else {
68            match build::generate_machines_file_from_healthy(&healthy) {
69                Ok(path) => Some(path),
70                Err(e) => {
71                    eprintln!("warning: failed to generate machines file: {}", e);
72                    None
73                }
74            }
75        }
76    } else {
77        None
78    };
79
80    // Set up shared context
81    let ctx = DagContext::new();
82    ctx.set_state("fleet_config", config.clone());
83    ctx.set_state("action", action);
84    if let Some(ref path) = machines_file {
85        ctx.set_state("machines_file", path.clone());
86    }
87
88    // Determine stage concurrency limits
89    // eval: limited to 1 (nix evaluation is memory-heavy)
90    // build: up to max_parallel (nix distributes across builders internally)
91    // copy: up to max_parallel (IO-bound, can be aggressive)
92    // activate: limited to avoid overwhelming the fleet
93    let eval_limit = 1;
94    let build_limit = max_parallel;
95    let copy_limit = max_parallel;
96    let activate_limit = max_parallel.min(4);
97
98    // Build the deployment DAG
99    let mut builder = StageBuilder::new()
100        .resources(target_nodes.to_vec())
101        .stage("eval", Some(eval_limit), |host| {
102            Box::new(tasks::NixEvalTask::new(host))
103        })
104        .stage("build", Some(build_limit), |host| {
105            Box::new(tasks::NixBuildTask::new(host))
106        })
107        .error_policy(ErrorPolicy::ContinueIndependent)
108        .context(ctx);
109
110    // Only add copy + activate stages if not build-only
111    if action != DeployAction::Build {
112        builder = builder
113            .stage("copy", Some(copy_limit), |host| {
114                Box::new(tasks::NixCopyTask::new(host))
115            })
116            .stage("activate", Some(activate_limit), |host| {
117                Box::new(tasks::NixActivateTask::new(host))
118            });
119    }
120
121    let dag_report = builder
122        .build()
123        .map_err(|e| NixError::General(e.to_string()))?
124        .run()
125        .map_err(|e| NixError::General(e.to_string()))?;
126
127    Ok(DeployReport::from_dag_report(
128        &dag_report,
129        target_nodes,
130        action,
131    ))
132}
133
134/// Cascade-driven deploy: same eval/build/activate as [`deploy`], but
135/// the per-host `nix copy` stage is replaced by a single whole-fleet
136/// cascade that distributes each toplevel peer-to-peer.
137///
138/// ## Why
139///
140/// `deploy()` runs N parallel `nix copy` subprocesses, each pulling
141/// from the build host. The build host's uplink is the bottleneck —
142/// at scale the deploy serializes on it.
143///
144/// `deploy_with_cascade()` lets nodes that have already received the
145/// closure serve the next round's targets. With `fanout=2` and N hosts
146/// sharing one toplevel, copy time drops from `N * single_copy_duration`
147/// to `ceil(log2(N+1)) * single_copy_duration` in the best case.
148///
149/// ## How
150///
151/// 1. **DAG phase 1**: per-host eval + build (same as `deploy()`).
152/// 2. **Cascade phase**: collect built toplevels, group by toplevel,
153///    run one `cascade_copy_grouped()` per group. Live UI via
154///    `event_sink` if provided.
155/// 3. **DAG phase 2**: per-host activate (only for hosts whose copy
156///    succeeded; failed-copy hosts are reported as copy_failures).
157///
158/// ## When NOT to use
159///
160/// - 1-2 targets: cascade overhead > parallel direct copy. Use `deploy()`.
161/// - `action == DeployAction::Build`: nothing to copy. Use `deploy()`.
162/// - First-time use against an untrusted fleet: prefer `deploy()` first
163///   to validate SSH + signing trust path, then switch.
164pub fn deploy_with_cascade(
165    config: &FleetConfig,
166    target_nodes: &[String],
167    action: DeployAction,
168    max_parallel: usize,
169    use_builders: bool,
170    cascade_fanout: u32,
171    seed_addr: &str,
172    event_sink: Option<&dyn EventSink>,
173) -> Result<DeployReport> {
174    // Build-only path: no copy, no cascade — defer to deploy().
175    if action == DeployAction::Build {
176        return deploy(config, target_nodes, action, max_parallel, use_builders);
177    }
178
179    // Phase 0: builder health check (same as deploy()).
180    let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
181        let statuses = health::check_builders(config);
182        let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
183        if healthy.is_empty() {
184            eprintln!("warning: no healthy builders available, building locally");
185            None
186        } else {
187            match build::generate_machines_file_from_healthy(&healthy) {
188                Ok(path) => Some(path),
189                Err(e) => {
190                    eprintln!("warning: failed to generate machines file: {}", e);
191                    None
192                }
193            }
194        }
195    } else {
196        None
197    };
198
199    // Phase 1 DAG: eval + build only.
200    let ctx1 = DagContext::new();
201    ctx1.set_state("fleet_config", config.clone());
202    ctx1.set_state("action", action);
203    if let Some(ref path) = machines_file {
204        ctx1.set_state("machines_file", path.clone());
205    }
206    let phase1_report = StageBuilder::new()
207        .resources(target_nodes.to_vec())
208        .stage("eval", Some(1), |host| {
209            Box::new(tasks::NixEvalTask::new(host))
210        })
211        .stage("build", Some(max_parallel), |host| {
212            Box::new(tasks::NixBuildTask::new(host))
213        })
214        .error_policy(ErrorPolicy::ContinueIndependent)
215        .context(ctx1.clone())
216        .build()
217        .map_err(|e| NixError::General(e.to_string()))?
218        .run()
219        .map_err(|e| NixError::General(e.to_string()))?;
220
221    // Collect successfully-built (host, toplevel) pairs from ctx1
222    // outputs. Skip hosts whose build failed.
223    let mut targets_for_cascade: Vec<CascadeCopyTarget> = Vec::new();
224    let mut build_failures: Vec<(String, String)> = Vec::new();
225    for host in target_nodes {
226        let build_id = TaskId(format!("build:{}", host));
227        if let Some(err) = phase1_report.failed.get(&build_id) {
228            build_failures.push((host.clone(), err.clone()));
229            continue;
230        }
231        let Some(toplevel) = ctx1.get_output::<String>(&build_id) else {
232            build_failures.push((
233                host.clone(),
234                "build succeeded but produced no toplevel output".into(),
235            ));
236            continue;
237        };
238        let Some(node) = config.nodes.get(host) else {
239            build_failures.push((host.clone(), "host missing from fleet config".into()));
240            continue;
241        };
242        targets_for_cascade.push(CascadeCopyTarget {
243            host_name: host.clone(),
244            ssh_addr: format!("{}@{}", node.target_user, node.target_host),
245            toplevel_path: toplevel,
246        });
247    }
248
249    // Cascade phase: group by toplevel, fan-out per group.
250    let mut cfg = CascadeCopyConfig::new(seed_addr.to_string(), targets_for_cascade.clone())
251        .fanout(cascade_fanout);
252    if let Some(sink) = event_sink {
253        cfg = cfg.events(sink);
254    }
255    let cascade_result = cascade_copy_grouped(cfg);
256
257    // Build a synthetic Phase-2 DagContext that pre-loads the cascade
258    // results into "copy:{host}" outputs so NixActivateTask can read
259    // them without modification.
260    let ctx2 = DagContext::new();
261    ctx2.set_state("fleet_config", config.clone());
262    ctx2.set_state("action", action);
263
264    // Carry the toplevels forward so activate can find them. Hosts
265    // whose copy failed are excluded — they won't be in the activate
266    // resource list.
267    let copied_set: std::collections::HashSet<&String> = cascade_result.copied.iter().collect();
268    let mut activate_targets: Vec<String> = Vec::new();
269    for t in &targets_for_cascade {
270        if copied_set.contains(&t.host_name) {
271            ctx2.set_output(
272                TaskId(format!("copy:{}", t.host_name)),
273                t.toplevel_path.clone(),
274            );
275            activate_targets.push(t.host_name.clone());
276        }
277    }
278
279    // Phase 2 DAG: activate only.
280    let phase2_report = if !activate_targets.is_empty() {
281        StageBuilder::new()
282            .resources(activate_targets.clone())
283            .stage("activate", Some(max_parallel.min(4)), |host| {
284                Box::new(tasks::NixActivateTask::new(host))
285            })
286            .error_policy(ErrorPolicy::ContinueIndependent)
287            .context(ctx2)
288            .build()
289            .map_err(|e| NixError::General(e.to_string()))?
290            .run()
291            .map_err(|e| NixError::General(e.to_string()))?
292    } else {
293        DagReport {
294            completed: Default::default(),
295            skipped: Default::default(),
296            failed: Default::default(),
297            cancelled: Default::default(),
298        }
299    };
300
301    // Synthesize a DeployReport from the three phases.
302    let mut activated = Vec::new();
303    let mut activation_failures = Vec::new();
304    for host in &activate_targets {
305        let id = TaskId(format!("activate:{}", host));
306        if phase2_report.completed.contains(&id) || phase2_report.skipped.contains(&id) {
307            activated.push(host.clone());
308        } else if let Some(err) = phase2_report.failed.get(&id) {
309            activation_failures.push((host.clone(), err.clone()));
310        }
311    }
312
313    let built: Vec<String> = target_nodes
314        .iter()
315        .filter(|h| !build_failures.iter().any(|(b, _)| &b == h))
316        .cloned()
317        .collect();
318
319    let copy_failures: Vec<(String, String)> = cascade_result
320        .failed
321        .into_iter()
322        .map(|(h, e)| (h, e))
323        .collect();
324
325    Ok(DeployReport {
326        built,
327        copied: cascade_result.copied,
328        activated,
329        build_failures,
330        copy_failures,
331        activation_failures,
332    })
333}
334
335/// Summary of a deployment run.
336#[derive(Debug)]
337pub struct DeployReport {
338    /// Hosts whose closures were built successfully.
339    pub built: Vec<String>,
340    /// Hosts whose closures were copied successfully.
341    pub copied: Vec<String>,
342    /// Hosts that were activated successfully.
343    pub activated: Vec<String>,
344    /// Hosts that failed to build (name, error message).
345    pub build_failures: Vec<(String, String)>,
346    /// Hosts that failed closure copy (name, error message).
347    pub copy_failures: Vec<(String, String)>,
348    /// Hosts that failed activation (name, error message).
349    pub activation_failures: Vec<(String, String)>,
350}
351
352impl DeployReport {
353    /// Whether the deployment was fully successful (no failures).
354    pub fn is_success(&self) -> bool {
355        self.build_failures.is_empty()
356            && self.copy_failures.is_empty()
357            && self.activation_failures.is_empty()
358    }
359
360    /// Total number of failures across all phases.
361    pub fn failure_count(&self) -> usize {
362        self.build_failures.len() + self.copy_failures.len() + self.activation_failures.len()
363    }
364
365    /// Number of hosts that completed successfully (all phases).
366    pub fn success_count(&self) -> usize {
367        self.activated.len()
368    }
369
370    /// Build a DeployReport from a DagReport by inspecting task IDs.
371    fn from_dag_report(report: &DagReport, target_nodes: &[String], action: DeployAction) -> Self {
372        let mut built = Vec::new();
373        let mut copied = Vec::new();
374        let mut activated = Vec::new();
375        let mut build_failures = Vec::new();
376        let mut copy_failures = Vec::new();
377        let mut activation_failures = Vec::new();
378
379        for host in target_nodes {
380            let build_id = consortium::dag::TaskId(format!("build:{}", host));
381            let copy_id = consortium::dag::TaskId(format!("copy:{}", host));
382            let activate_id = consortium::dag::TaskId(format!("activate:{}", host));
383
384            // Check build
385            if report.completed.contains(&build_id) || report.skipped.contains(&build_id) {
386                built.push(host.clone());
387            } else if let Some(err) = report.failed.get(&build_id) {
388                build_failures.push((host.clone(), err.clone()));
389            }
390            // cancelled builds are not reported as failures — they're implied by an earlier failure
391
392            if action == DeployAction::Build {
393                continue;
394            }
395
396            // Check copy
397            if report.completed.contains(&copy_id) || report.skipped.contains(&copy_id) {
398                copied.push(host.clone());
399            } else if let Some(err) = report.failed.get(&copy_id) {
400                copy_failures.push((host.clone(), err.clone()));
401            }
402
403            // Check activate
404            if report.completed.contains(&activate_id) || report.skipped.contains(&activate_id) {
405                activated.push(host.clone());
406            } else if let Some(err) = report.failed.get(&activate_id) {
407                activation_failures.push((host.clone(), err.clone()));
408            }
409        }
410
411        Self {
412            built,
413            copied,
414            activated,
415            build_failures,
416            copy_failures,
417            activation_failures,
418        }
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use consortium::dag::{DagReport, TaskId};
426    use std::collections::HashSet;
427
428    fn make_report(completed: &[&str], failed: &[(&str, &str)], cancelled: &[&str]) -> DagReport {
429        DagReport {
430            completed: completed.iter().map(|s| TaskId(s.to_string())).collect(),
431            skipped: HashSet::new(),
432            failed: failed
433                .iter()
434                .map(|(k, v)| (TaskId(k.to_string()), v.to_string()))
435                .collect(),
436            cancelled: cancelled.iter().map(|s| TaskId(s.to_string())).collect(),
437        }
438    }
439
440    #[test]
441    fn test_deploy_report_all_success() {
442        let dag_report = make_report(
443            &[
444                "eval:hp01",
445                "build:hp01",
446                "copy:hp01",
447                "activate:hp01",
448                "eval:hp02",
449                "build:hp02",
450                "copy:hp02",
451                "activate:hp02",
452            ],
453            &[],
454            &[],
455        );
456        let targets = vec!["hp01".to_string(), "hp02".to_string()];
457        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
458
459        assert!(report.is_success());
460        assert_eq!(report.built, vec!["hp01", "hp02"]);
461        assert_eq!(report.copied, vec!["hp01", "hp02"]);
462        assert_eq!(report.activated, vec!["hp01", "hp02"]);
463        assert_eq!(report.failure_count(), 0);
464    }
465
466    #[test]
467    fn test_deploy_report_build_failure() {
468        let dag_report = make_report(
469            &[
470                "eval:hp01",
471                "eval:hp02",
472                "build:hp01",
473                "copy:hp01",
474                "activate:hp01",
475            ],
476            &[("build:hp02", "nix build failed")],
477            &["copy:hp02", "activate:hp02"],
478        );
479        let targets = vec!["hp01".to_string(), "hp02".to_string()];
480        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
481
482        assert!(!report.is_success());
483        assert_eq!(report.built, vec!["hp01"]);
484        assert_eq!(report.activated, vec!["hp01"]);
485        assert_eq!(
486            report.build_failures,
487            vec![("hp02".to_string(), "nix build failed".to_string())]
488        );
489        // copy and activate for hp02 are cancelled, not failed
490        assert!(report.copy_failures.is_empty());
491        assert!(report.activation_failures.is_empty());
492    }
493
494    #[test]
495    fn test_deploy_report_copy_failure() {
496        let dag_report = make_report(
497            &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
498            &[("copy:hp01", "ssh connection refused")],
499            &["activate:hp01"],
500        );
501        let targets = vec!["hp01".to_string(), "hp02".to_string()];
502        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
503
504        assert!(!report.is_success());
505        assert_eq!(report.built, vec!["hp01", "hp02"]); // both built
506        assert_eq!(
507            report.copy_failures,
508            vec![("hp01".to_string(), "ssh connection refused".to_string())]
509        );
510    }
511
512    #[test]
513    fn test_deploy_report_build_only() {
514        let dag_report = make_report(
515            &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
516            &[],
517            &[],
518        );
519        let targets = vec!["hp01".to_string(), "hp02".to_string()];
520        let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Build);
521
522        assert!(report.is_success());
523        assert_eq!(report.built, vec!["hp01", "hp02"]);
524        assert!(report.copied.is_empty());
525        assert!(report.activated.is_empty());
526    }
527
528    #[test]
529    fn test_deploy_action_display_roundtrip() {
530        for action in &["switch", "boot", "test", "dry-activate", "build"] {
531            let parsed: DeployAction = action.parse().unwrap();
532            assert_eq!(parsed.to_string(), *action);
533        }
534    }
535
536    #[test]
537    fn test_deploy_action_invalid() {
538        assert!("reboot".parse::<DeployAction>().is_err());
539        assert!("".parse::<DeployAction>().is_err());
540        assert!("SWITCH".parse::<DeployAction>().is_err());
541    }
542}