1pub mod activate;
26pub mod build;
27pub mod cascade;
28pub mod cascade_events;
29pub mod cascade_executor;
30pub mod cascade_integration;
31pub mod cascade_strategies;
32pub mod cascade_trace;
33pub mod config;
34pub mod copy;
35pub mod error;
36pub mod eval;
37pub mod health;
38pub mod tasks;
39
40pub use config::{DeployAction, DeploymentNode, DeploymentPlan, FleetConfig, ProfileType};
41pub use error::{NixError, Result};
42
43use consortium::dag::{DagContext, DagReport, ErrorPolicy, StageBuilder, TaskId};
44
45use crate::cascade_events::EventSink;
46use crate::cascade_integration::{cascade_copy_grouped, CascadeCopyConfig, CascadeCopyTarget};
47
48pub fn deploy(
54 config: &FleetConfig,
55 target_nodes: &[String],
56 action: DeployAction,
57 max_parallel: usize,
58 use_builders: bool,
59) -> Result<DeployReport> {
60 let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
62 let statuses = health::check_builders(config);
63 let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
64 if healthy.is_empty() {
65 eprintln!("warning: no healthy builders available, building locally");
66 None
67 } else {
68 match build::generate_machines_file_from_healthy(&healthy) {
69 Ok(path) => Some(path),
70 Err(e) => {
71 eprintln!("warning: failed to generate machines file: {}", e);
72 None
73 }
74 }
75 }
76 } else {
77 None
78 };
79
80 let ctx = DagContext::new();
82 ctx.set_state("fleet_config", config.clone());
83 ctx.set_state("action", action);
84 if let Some(ref path) = machines_file {
85 ctx.set_state("machines_file", path.clone());
86 }
87
88 let eval_limit = 1;
94 let build_limit = max_parallel;
95 let copy_limit = max_parallel;
96 let activate_limit = max_parallel.min(4);
97
98 let mut builder = StageBuilder::new()
100 .resources(target_nodes.to_vec())
101 .stage("eval", Some(eval_limit), |host| {
102 Box::new(tasks::NixEvalTask::new(host))
103 })
104 .stage("build", Some(build_limit), |host| {
105 Box::new(tasks::NixBuildTask::new(host))
106 })
107 .error_policy(ErrorPolicy::ContinueIndependent)
108 .context(ctx);
109
110 if action != DeployAction::Build {
112 builder = builder
113 .stage("copy", Some(copy_limit), |host| {
114 Box::new(tasks::NixCopyTask::new(host))
115 })
116 .stage("activate", Some(activate_limit), |host| {
117 Box::new(tasks::NixActivateTask::new(host))
118 });
119 }
120
121 let dag_report = builder
122 .build()
123 .map_err(|e| NixError::General(e.to_string()))?
124 .run()
125 .map_err(|e| NixError::General(e.to_string()))?;
126
127 Ok(DeployReport::from_dag_report(
128 &dag_report,
129 target_nodes,
130 action,
131 ))
132}
133
134pub fn deploy_with_cascade(
165 config: &FleetConfig,
166 target_nodes: &[String],
167 action: DeployAction,
168 max_parallel: usize,
169 use_builders: bool,
170 cascade_fanout: u32,
171 seed_addr: &str,
172 event_sink: Option<&dyn EventSink>,
173) -> Result<DeployReport> {
174 if action == DeployAction::Build {
176 return deploy(config, target_nodes, action, max_parallel, use_builders);
177 }
178
179 let machines_file: Option<String> = if use_builders && !config.builders.is_empty() {
181 let statuses = health::check_builders(config);
182 let healthy: Vec<_> = statuses.iter().filter(|s| s.healthy).cloned().collect();
183 if healthy.is_empty() {
184 eprintln!("warning: no healthy builders available, building locally");
185 None
186 } else {
187 match build::generate_machines_file_from_healthy(&healthy) {
188 Ok(path) => Some(path),
189 Err(e) => {
190 eprintln!("warning: failed to generate machines file: {}", e);
191 None
192 }
193 }
194 }
195 } else {
196 None
197 };
198
199 let ctx1 = DagContext::new();
201 ctx1.set_state("fleet_config", config.clone());
202 ctx1.set_state("action", action);
203 if let Some(ref path) = machines_file {
204 ctx1.set_state("machines_file", path.clone());
205 }
206 let phase1_report = StageBuilder::new()
207 .resources(target_nodes.to_vec())
208 .stage("eval", Some(1), |host| {
209 Box::new(tasks::NixEvalTask::new(host))
210 })
211 .stage("build", Some(max_parallel), |host| {
212 Box::new(tasks::NixBuildTask::new(host))
213 })
214 .error_policy(ErrorPolicy::ContinueIndependent)
215 .context(ctx1.clone())
216 .build()
217 .map_err(|e| NixError::General(e.to_string()))?
218 .run()
219 .map_err(|e| NixError::General(e.to_string()))?;
220
221 let mut targets_for_cascade: Vec<CascadeCopyTarget> = Vec::new();
224 let mut build_failures: Vec<(String, String)> = Vec::new();
225 for host in target_nodes {
226 let build_id = TaskId(format!("build:{}", host));
227 if let Some(err) = phase1_report.failed.get(&build_id) {
228 build_failures.push((host.clone(), err.clone()));
229 continue;
230 }
231 let Some(toplevel) = ctx1.get_output::<String>(&build_id) else {
232 build_failures.push((
233 host.clone(),
234 "build succeeded but produced no toplevel output".into(),
235 ));
236 continue;
237 };
238 let Some(node) = config.nodes.get(host) else {
239 build_failures.push((host.clone(), "host missing from fleet config".into()));
240 continue;
241 };
242 targets_for_cascade.push(CascadeCopyTarget {
243 host_name: host.clone(),
244 ssh_addr: format!("{}@{}", node.target_user, node.target_host),
245 toplevel_path: toplevel,
246 });
247 }
248
249 let mut cfg = CascadeCopyConfig::new(seed_addr.to_string(), targets_for_cascade.clone())
251 .fanout(cascade_fanout);
252 if let Some(sink) = event_sink {
253 cfg = cfg.events(sink);
254 }
255 let cascade_result = cascade_copy_grouped(cfg);
256
257 let ctx2 = DagContext::new();
261 ctx2.set_state("fleet_config", config.clone());
262 ctx2.set_state("action", action);
263
264 let copied_set: std::collections::HashSet<&String> = cascade_result.copied.iter().collect();
268 let mut activate_targets: Vec<String> = Vec::new();
269 for t in &targets_for_cascade {
270 if copied_set.contains(&t.host_name) {
271 ctx2.set_output(
272 TaskId(format!("copy:{}", t.host_name)),
273 t.toplevel_path.clone(),
274 );
275 activate_targets.push(t.host_name.clone());
276 }
277 }
278
279 let phase2_report = if !activate_targets.is_empty() {
281 StageBuilder::new()
282 .resources(activate_targets.clone())
283 .stage("activate", Some(max_parallel.min(4)), |host| {
284 Box::new(tasks::NixActivateTask::new(host))
285 })
286 .error_policy(ErrorPolicy::ContinueIndependent)
287 .context(ctx2)
288 .build()
289 .map_err(|e| NixError::General(e.to_string()))?
290 .run()
291 .map_err(|e| NixError::General(e.to_string()))?
292 } else {
293 DagReport {
294 completed: Default::default(),
295 skipped: Default::default(),
296 failed: Default::default(),
297 cancelled: Default::default(),
298 }
299 };
300
301 let mut activated = Vec::new();
303 let mut activation_failures = Vec::new();
304 for host in &activate_targets {
305 let id = TaskId(format!("activate:{}", host));
306 if phase2_report.completed.contains(&id) || phase2_report.skipped.contains(&id) {
307 activated.push(host.clone());
308 } else if let Some(err) = phase2_report.failed.get(&id) {
309 activation_failures.push((host.clone(), err.clone()));
310 }
311 }
312
313 let built: Vec<String> = target_nodes
314 .iter()
315 .filter(|h| !build_failures.iter().any(|(b, _)| &b == h))
316 .cloned()
317 .collect();
318
319 let copy_failures: Vec<(String, String)> = cascade_result
320 .failed
321 .into_iter()
322 .map(|(h, e)| (h, e))
323 .collect();
324
325 Ok(DeployReport {
326 built,
327 copied: cascade_result.copied,
328 activated,
329 build_failures,
330 copy_failures,
331 activation_failures,
332 })
333}
334
335#[derive(Debug)]
337pub struct DeployReport {
338 pub built: Vec<String>,
340 pub copied: Vec<String>,
342 pub activated: Vec<String>,
344 pub build_failures: Vec<(String, String)>,
346 pub copy_failures: Vec<(String, String)>,
348 pub activation_failures: Vec<(String, String)>,
350}
351
352impl DeployReport {
353 pub fn is_success(&self) -> bool {
355 self.build_failures.is_empty()
356 && self.copy_failures.is_empty()
357 && self.activation_failures.is_empty()
358 }
359
360 pub fn failure_count(&self) -> usize {
362 self.build_failures.len() + self.copy_failures.len() + self.activation_failures.len()
363 }
364
365 pub fn success_count(&self) -> usize {
367 self.activated.len()
368 }
369
370 fn from_dag_report(report: &DagReport, target_nodes: &[String], action: DeployAction) -> Self {
372 let mut built = Vec::new();
373 let mut copied = Vec::new();
374 let mut activated = Vec::new();
375 let mut build_failures = Vec::new();
376 let mut copy_failures = Vec::new();
377 let mut activation_failures = Vec::new();
378
379 for host in target_nodes {
380 let build_id = consortium::dag::TaskId(format!("build:{}", host));
381 let copy_id = consortium::dag::TaskId(format!("copy:{}", host));
382 let activate_id = consortium::dag::TaskId(format!("activate:{}", host));
383
384 if report.completed.contains(&build_id) || report.skipped.contains(&build_id) {
386 built.push(host.clone());
387 } else if let Some(err) = report.failed.get(&build_id) {
388 build_failures.push((host.clone(), err.clone()));
389 }
390 if action == DeployAction::Build {
393 continue;
394 }
395
396 if report.completed.contains(©_id) || report.skipped.contains(©_id) {
398 copied.push(host.clone());
399 } else if let Some(err) = report.failed.get(©_id) {
400 copy_failures.push((host.clone(), err.clone()));
401 }
402
403 if report.completed.contains(&activate_id) || report.skipped.contains(&activate_id) {
405 activated.push(host.clone());
406 } else if let Some(err) = report.failed.get(&activate_id) {
407 activation_failures.push((host.clone(), err.clone()));
408 }
409 }
410
411 Self {
412 built,
413 copied,
414 activated,
415 build_failures,
416 copy_failures,
417 activation_failures,
418 }
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use consortium::dag::{DagReport, TaskId};
426 use std::collections::HashSet;
427
428 fn make_report(completed: &[&str], failed: &[(&str, &str)], cancelled: &[&str]) -> DagReport {
429 DagReport {
430 completed: completed.iter().map(|s| TaskId(s.to_string())).collect(),
431 skipped: HashSet::new(),
432 failed: failed
433 .iter()
434 .map(|(k, v)| (TaskId(k.to_string()), v.to_string()))
435 .collect(),
436 cancelled: cancelled.iter().map(|s| TaskId(s.to_string())).collect(),
437 }
438 }
439
440 #[test]
441 fn test_deploy_report_all_success() {
442 let dag_report = make_report(
443 &[
444 "eval:hp01",
445 "build:hp01",
446 "copy:hp01",
447 "activate:hp01",
448 "eval:hp02",
449 "build:hp02",
450 "copy:hp02",
451 "activate:hp02",
452 ],
453 &[],
454 &[],
455 );
456 let targets = vec!["hp01".to_string(), "hp02".to_string()];
457 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
458
459 assert!(report.is_success());
460 assert_eq!(report.built, vec!["hp01", "hp02"]);
461 assert_eq!(report.copied, vec!["hp01", "hp02"]);
462 assert_eq!(report.activated, vec!["hp01", "hp02"]);
463 assert_eq!(report.failure_count(), 0);
464 }
465
466 #[test]
467 fn test_deploy_report_build_failure() {
468 let dag_report = make_report(
469 &[
470 "eval:hp01",
471 "eval:hp02",
472 "build:hp01",
473 "copy:hp01",
474 "activate:hp01",
475 ],
476 &[("build:hp02", "nix build failed")],
477 &["copy:hp02", "activate:hp02"],
478 );
479 let targets = vec!["hp01".to_string(), "hp02".to_string()];
480 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
481
482 assert!(!report.is_success());
483 assert_eq!(report.built, vec!["hp01"]);
484 assert_eq!(report.activated, vec!["hp01"]);
485 assert_eq!(
486 report.build_failures,
487 vec![("hp02".to_string(), "nix build failed".to_string())]
488 );
489 assert!(report.copy_failures.is_empty());
491 assert!(report.activation_failures.is_empty());
492 }
493
494 #[test]
495 fn test_deploy_report_copy_failure() {
496 let dag_report = make_report(
497 &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
498 &[("copy:hp01", "ssh connection refused")],
499 &["activate:hp01"],
500 );
501 let targets = vec!["hp01".to_string(), "hp02".to_string()];
502 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Switch);
503
504 assert!(!report.is_success());
505 assert_eq!(report.built, vec!["hp01", "hp02"]); assert_eq!(
507 report.copy_failures,
508 vec![("hp01".to_string(), "ssh connection refused".to_string())]
509 );
510 }
511
512 #[test]
513 fn test_deploy_report_build_only() {
514 let dag_report = make_report(
515 &["eval:hp01", "build:hp01", "eval:hp02", "build:hp02"],
516 &[],
517 &[],
518 );
519 let targets = vec!["hp01".to_string(), "hp02".to_string()];
520 let report = DeployReport::from_dag_report(&dag_report, &targets, DeployAction::Build);
521
522 assert!(report.is_success());
523 assert_eq!(report.built, vec!["hp01", "hp02"]);
524 assert!(report.copied.is_empty());
525 assert!(report.activated.is_empty());
526 }
527
528 #[test]
529 fn test_deploy_action_display_roundtrip() {
530 for action in &["switch", "boot", "test", "dry-activate", "build"] {
531 let parsed: DeployAction = action.parse().unwrap();
532 assert_eq!(parsed.to_string(), *action);
533 }
534 }
535
536 #[test]
537 fn test_deploy_action_invalid() {
538 assert!("reboot".parse::<DeployAction>().is_err());
539 assert!("".parse::<DeployAction>().is_err());
540 assert!("SWITCH".parse::<DeployAction>().is_err());
541 }
542}