cindy-cli 0.2.0

Managing infrastructure at breakneck speed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
mod conditional_sudo_command;
mod limit;
mod picker;
mod progress;
mod secret;

use clap::Parser;
use eyre::{ContextCompat as _, WrapErr as _};
use futures_util::future::try_join_all;
use openssh::Stdio;
use std::collections::{HashMap, HashSet};
use std::io::IsTerminal as _;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tracing::Instrument as _;

use crate::conditional_sudo_command::ConditionalSudoCommand as _;

/// Stable replacement for the nightly `ExitStatus::exit_ok`: turn a non-zero
/// process exit into an error rather than silently succeeding.
trait ExitOk {
    fn exit_ok(self) -> eyre::Result<()>;
}
impl ExitOk for std::process::ExitStatus {
    fn exit_ok(self) -> eyre::Result<()> {
        if self.success() {
            Ok(())
        } else {
            eyre::bail!("process exited with {self}");
        }
    }
}

/// Like [`ExitOk`] but for a captured [`std::process::Output`], returning the
/// output on success and the captured stderr as the error otherwise.
trait OutputOrError: Sized {
    fn ok_or_die(self) -> eyre::Result<Self>;
}
impl OutputOrError for std::process::Output {
    fn ok_or_die(self) -> eyre::Result<Self> {
        if !self.status.success() {
            eyre::bail!("{}", String::from_utf8_lossy(&self.stderr));
        }
        Ok(self)
    }
}

#[derive(Debug, clap::Subcommand)]
enum Subcommand {
    Play {
        #[clap(long)]
        limit: Option<String>,
        #[clap(long)]
        unprivileged: bool,
    },
    Inventory {
        #[clap(long)]
        host: Option<String>,
        #[clap(long)]
        json: bool,
        #[clap(long)]
        reveal: bool,
    },
    /// Manage vaulted secrets (`Secret<T>`-typed values in your
    /// inventory). See `cindy secret --help` for sub-commands.
    #[clap(subcommand)]
    Secret(secret::SecretCommand),
}

#[derive(Debug, Parser)]
struct Args {
    #[command(subcommand)]
    subcommand: Subcommand,
}

#[derive(Debug)]
struct DiscoveredHost {
    host: cindy::Host<serde_json::Value>,
    session: openssh::Session,
    platform: &'static platforms::Platform,
    use_sudo: bool,
}

/// Read a child's stderr line by line and forward each (tagged with its
/// `source` and host `idx`) to the progress view. Generic over the reader so
/// it serves both the SSH worker and the local orchestrator. Ends at EOF.
async fn forward_stderr(
    mut reader: impl AsyncBufRead + Unpin,
    tx: tokio::sync::mpsc::UnboundedSender<progress::Msg>,
    idx: usize,
    source: progress::Source,
) {
    let mut line = String::new();
    while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
        let _ = tx.send(progress::Msg::Line(idx, source, line.trim_end().to_owned()));
        line.clear();
    }
}

/// The inventory as pulled from the orchestrator: the type-erased
/// `Inventory<Value>` used for `--limit` targeting, plus the per-host
/// (and whole-inventory) `Debug` renderings the orchestrator produced
/// from the user's real `V`.
struct PulledInventory {
    inventory: cindy::Inventory<serde_json::Value>,
    debug: std::collections::BTreeMap<String, String>,
    /// Present only when the dump ran in reveal mode. `Ok` holds the
    /// secrets-decrypted whole-inventory JSON; `Err` holds the failure
    /// (e.g. a missing vault key) to surface if `--json` was requested.
    json_revealed: Option<Result<serde_json::Value, String>>,
}

async fn get_inventory(
    orchestrator_exe_path: &Path,
    reveal_secrets: bool,
) -> eyre::Result<PulledInventory> {
    tracing::info!("Pulling inventory from orchestrator binary...");
    let mut cmd = tokio::process::Command::new(orchestrator_exe_path);
    cmd.env("CINDY_DUMP_INVENTORY", "1")
        .stdin(std::process::Stdio::null())
        .stdout(std::process::Stdio::piped())
        .stderr(std::process::Stdio::piped());
    if reveal_secrets {
        // The orchestrator decrypts `Secret<T>` values while rendering
        // the Debug map. Vault DEKs are read from `keys/<vault>.dek`
        // relative to the orchestrator's cwd (this process's cwd).
        cmd.env("CINDY_REVEAL_SECRETS", "1");
    }
    let dump = cmd.output().await?;
    if !dump.status.success() {
        eyre::bail!(
            "inventory dump exited with {:?}\n{}",
            dump.status,
            String::from_utf8_lossy(&dump.stderr)
        );
    }
    let dump: cindy::inventory::InventoryDump = serde_json::from_slice(&dump.stdout)
        .context("Failed to parse inventory dump from orchestrator")?;
    let inventory = serde_json::from_value(dump.json)
        .context("Failed to parse inventory JSON from orchestrator dump")?;
    Ok(PulledInventory {
        inventory,
        debug: dump.debug,
        json_revealed: dump.json_revealed,
    })
}

/// Absolute path to the Cargo workspace root, resolved via `cargo metadata`.
///
/// Every build the CLI drives uses an absolute `--target-dir` anchored here
/// so the worker/orchestrator binaries land — and are read back from — the
/// same place regardless of the directory `cindy` was invoked from. (A
/// relative `--target-dir` is relative to the *cwd*, which silently diverges
/// from the absolute paths we later compute, leaving us reading a binary that
/// was written somewhere else.)
fn workspace_root() -> eyre::Result<PathBuf> {
    let meta = cargo_metadata::MetadataCommand::new().exec()?;
    Ok(meta.workspace_root.into_std_path_buf())
}

#[tokio::main]
async fn main() -> eyre::Result<()> {
    let args = Args::parse();
    color_eyre::install()?;
    let workspace_root = workspace_root()?;
    // The run view (progress::run) takes over stderr with an alternate
    // screen when stderr is a tty. A tracing layer writing to that same
    // stderr would punch through and corrupt the TUI, so on a tty we route
    // logs into the TUI's log pane instead of stderr.
    if std::io::stderr().is_terminal() {
        // Keep ANSI colour in the log lines; the log pane parses it back
        // into styled spans (see progress::draw_log).
        tracing_subscriber::fmt()
            .with_ansi(true)
            .with_writer(progress::LogWriter)
            .init();
    } else {
        tracing_subscriber::fmt()
            .with_writer(std::io::stderr)
            .init();
    }

    // The `secret` subcommands typically don't need the orchestrator
    // — most touch only local files / the SSH agent. The exception is
    // `secret seal`, which has to ask the orchestrator binary to walk
    // its `inventory::iter::<PendingSecret>()` and emit ciphertexts.
    // We compile lazily for that one and short-circuit otherwise.
    if let Subcommand::Secret(cmd) = args.subcommand {
        let orch = if secret::requires_orchestrator(&cmd) {
            Some(compile_orchestrator(&workspace_root).await?)
        } else {
            None
        };
        return secret::dispatch(cmd, orch.as_deref()).await;
    }

    // For a `play` run, bring the live view up *now* — before the slow
    // orchestrator/worker compiles and SSH discovery — so the user sees
    // immediate activity (compile/discovery progress flows into the log
    // pane via tracing) instead of a seemingly frozen terminal. The host
    // list starts empty and is populated with `AddHost` once discovery
    // knows the targets. `Inventory`/`Secret` runs print to stdout and must
    // not take over the screen, so they keep `view` as `None`.
    let is_play = matches!(args.subcommand, Subcommand::Play { .. });
    let (progress_tx, progress_rx) = tokio::sync::mpsc::unbounded_channel::<progress::Msg>();
    let view = is_play.then(|| tokio::spawn(progress::run(progress_rx)));

    // Everything below can fail with `?`/`bail!`. While the `play` view owns
    // the alternate screen, returning early would leave the terminal in raw
    // mode with the alt-screen still up. So we run the whole flow in an inner
    // block, capture its result, then *always* close the channel and await
    // the view (letting it restore the terminal) before propagating.
    let result: eyre::Result<()> = run_after_launch(
        args.subcommand,
        workspace_root,
        progress_tx.clone(),
    )
    .await;

    // Closing our keeper sender lets the view's channel drain and close; the
    // deployment futures hold their own clones until they finish.
    drop(progress_tx);
    if let Some(view) = view {
        // Await the view so it leaves the alternate screen / raw mode before
        // we print anything below.
        let _ = view.await;
    }

    // A completed-but-failed run is reported as a single summary line and a
    // non-zero exit, without eyre's backtrace decoration. Any other error
    // propagates normally (eyre prints it).
    match result {
        Ok(()) => {
            tracing::info!("All pipeline target tracking execution runs completed successfully!");
            Ok(())
        }
        Err(e) if e.downcast_ref::<RunFailed>().is_some() => {
            eprintln!("\x1b[31m{e}\x1b[0m");
            std::process::exit(1);
        }
        Err(e) => Err(e),
    }
}

/// The full `cindy` flow after the (optional) live view has been launched.
/// Pulled into its own function so `main` can guarantee the view is torn
/// down — restoring the terminal — regardless of how this returns.
async fn run_after_launch(
    subcommand: Subcommand,
    workspace_root: PathBuf,
    progress_tx: tokio::sync::mpsc::UnboundedSender<progress::Msg>,
) -> eyre::Result<()> {
    let orchestrator_exe_path = compile_orchestrator(&workspace_root).await?;

    // `--reveal` must reach the orchestrator dump, which does the
    // decryption (it has the vault keys and the typed `V`); the CLI
    // only ever sees the already-revealed-or-redacted result. So the
    // flag has to be known before we pull the inventory. It applies to
    // both the `Debug` rendering and `--json`.
    let reveal_secrets = matches!(subcommand, Subcommand::Inventory { reveal: true, .. });
    let PulledInventory {
        inventory,
        debug,
        json_revealed,
    } = get_inventory(&orchestrator_exe_path, reveal_secrets).await?;

    let (limit, unprivileged) = match subcommand {
        Subcommand::Play {
            limit,
            unprivileged,
        } => (limit, unprivileged),
        Subcommand::Inventory { host, json, reveal } => {
            if json {
                // Pick the JSON source: the secrets-decrypted whole
                // inventory under `--reveal`, otherwise the sealed one.
                // `--host` narrows to that host's object so the output
                // stays a single JSON document for `jq` et al.
                let whole: serde_json::Value = if reveal {
                    match json_revealed
                        .context("orchestrator dump omitted revealed JSON despite `--reveal`")?
                    {
                        Ok(v) => v,
                        Err(e) => eyre::bail!("failed to reveal secrets for JSON output: {e}"),
                    }
                } else {
                    serde_json::to_value(&inventory)?
                };
                let value = match &host {
                    Some(host_name) => whole
                        .get("hosts")
                        .and_then(|h| h.as_array())
                        .and_then(|hosts| {
                            hosts
                                .iter()
                                .find(|h| h.get("name").and_then(|n| n.as_str()) == Some(host_name))
                        })
                        .cloned()
                        .with_context(|| {
                            format!("No such host `{host_name}` found in inventory")
                        })?,
                    None => whole,
                };
                println!("{}", serde_json::to_string_pretty(&value)?);
                return Ok(());
            }
            // Print the `Debug` rendering the orchestrator produced from
            // the user's real `V`, not the JSON-`Value` `Debug` the CLI
            // would otherwise emit.
            let key = host
                .as_deref()
                .unwrap_or(cindy::inventory::WHOLE_INVENTORY_KEY);
            let rendered = debug.get(key).with_context(|| {
                if host.is_some() {
                    format!("No such host `{key}` found in inventory")
                } else {
                    "Orchestrator dump did not include a whole-inventory rendering".to_owned()
                }
            })?;
            println!("{rendered}");
            return Ok(());
        }
        Subcommand::Secret(_) => unreachable!("handled above"),
    };

    let selected_indices = limit::select_indices(
        &inventory.hosts,
        limit.as_deref(),
        |h| h.name.as_str(),
        |h| h.tags.as_slice(),
    )?;
    let selected: Vec<cindy::Host<serde_json::Value>> = selected_indices
        .into_iter()
        .map(|i| inventory.hosts[i].clone())
        .collect();

    if selected.is_empty() {
        tracing::warn!(
            limit,
            "no hosts matched the `--limit` expression; nothing to do"
        );
        return Ok(());
    }
    tracing::info!(count = selected.len(), "selected hosts after --limit");

    tracing::info!("Initiating pre-flight checks across targets...");
    let discovery_futures = selected.into_iter().map(|host| async move {
        let session = openssh::Session::connect(&host.name, openssh::KnownHosts::Add).await?;

        let uname_out = session
            .command("uname")
            .args(["-m"])
            .output()
            .await?
            .ok_or_die()?;
        let machine_arch = String::from_utf8(uname_out.stdout)?.trim().to_string();
        let triplet = format!("{machine_arch}-unknown-linux-musl");
        let platform = platforms::Platform::find(&triplet)
            .context(format!("Couldn't find platform by triplet: '{triplet}'"))?;

        let id_out = session
            .command("id")
            .args(["-u"])
            .output()
            .await?
            .ok_or_die()?;
        let use_sudo = !unprivileged && String::from_utf8(id_out.stdout)?.trim() != "0";

        Ok::<DiscoveredHost, eyre::Error>(DiscoveredHost {
            host,
            session,
            platform,
            use_sudo,
        })
    });

    let discovered_hosts = try_join_all(discovery_futures).await?;

    // The targets are known now — populate the view's (until-now empty) host
    // list. Send order defines each host's `idx`, matching the order the
    // deployment futures are enumerated below.
    for host in &discovered_hosts {
        let _ = progress_tx.send(progress::Msg::AddHost {
            name: host.host.name.clone(),
            tags: host.host.tags.clone(),
        });
    }

    let unique_platforms: HashSet<&'static platforms::Platform> =
        discovered_hosts.iter().map(|h| h.platform).collect();

    // https://github.com/rust-lang/rustup/issues/988
    tracing::info!(component = "rust-src", "installing component");
    std::process::Command::new("rustup")
        .args(["component", "add", "rust-src"])
        .output()?
        .ok_or_die()?;
    for platform in &unique_platforms {
        if matches!(platform.tier, platforms::Tier::Three) {
            continue;
        }
        tracing::info!(triplet = platform.target_triple, "installing target");
        std::process::Command::new("rustup")
            .args(["target", "add", platform.target_triple])
            .output()?
            .ok_or_die()?;
    }

    let cargo_meta = Arc::new(cargo_metadata::MetadataCommand::new().exec()?);
    let root = Arc::new(workspace_root.clone());
    let compilation_futures =
        unique_platforms.into_iter().map(|platform| {
            let meta = Arc::clone(&cargo_meta);
            let root = Arc::clone(&root);
            async move {
                Ok::<_, eyre::Error>((
                    platform,
                    compile_remote_worker(platform, &meta, &root).await?,
                ))
            }
        });
    let compilation_results = Arc::new(
        try_join_all(compilation_futures)
            .await?
            .into_iter()
            .collect::<HashMap<_, _>>(),
    );

    // Vault key preflight (fail-fast). Enumerate the vaults the play's
    // in-code `secret!`s reference, then — per host — union them with
    // that host's inventory-vars sealed-secret vaults, and confirm a
    // local `keys/<vault>.dek` exists for each. We do this for *every*
    // selected host before launching *any* of them, so a missing key
    // aborts the whole run up front instead of partway through.
    tracing::info!("Pre-flighting vault keys across targets...");
    let in_code_vaults = dump_in_code_vaults(&orchestrator_exe_path).await?;

    // host name -> serialised host context, and -> CINDY_VAULT_KEYS map.
    let mut host_contexts: HashMap<String, String> = HashMap::new();
    let mut host_vault_keys: HashMap<String, HashMap<String, String>> = HashMap::new();
    let mut preflight_failures: Vec<String> = Vec::new();

    for host in &discovered_hosts {
        let host_context_json =
            serde_json::to_string(&host.host).expect("Failed to serialise host context");
        let needed = host_needed_vaults(&in_code_vaults, &host_context_json);
        let (keys, missing) = collect_vault_keys(&needed);
        if !missing.is_empty() {
            preflight_failures.push(format!(
                "  {}: missing key(s) for vault(s) {missing:?}",
                host.host.name
            ));
        }
        host_contexts.insert(host.host.name.clone(), host_context_json);
        host_vault_keys.insert(host.host.name.clone(), keys);
    }

    if !preflight_failures.is_empty() {
        eyre::bail!(
            "vault key preflight failed — refusing to start. The following \
             hosts are missing decryption keys:\n{}\n\nProvision the key file(s) \
             (`cindy secret vault create <name>`, or copy `keys/<name>.dek` from \
             a teammate) and re-run.",
            preflight_failures.join("\n")
        );
    }
    tracing::info!("Vault key preflight passed for all targets.");

    tracing::info!("Launching parallel deployment loops...");
    let orchestrator_path = Arc::new(orchestrator_exe_path);
    let host_contexts = Arc::new(host_contexts);
    let host_vault_keys = Arc::new(host_vault_keys);

    let deployment_futures = discovered_hosts.into_iter().enumerate().map(|(idx, host)| {
        let orch_path = Arc::clone(&orchestrator_path);
        let compilation_results = Arc::clone(&compilation_results);
        let host_contexts = Arc::clone(&host_contexts);
        let host_vault_keys = Arc::clone(&host_vault_keys);
        let progress_tx = progress_tx.clone();

        async move {
            let hostname = host.host.name.clone();
            let context_span = tracing::info_span!("run", %hostname);
            let progress_status = progress_tx.clone();

            let result = async move {
                let local_worker_path = compilation_results
                    .get(host.platform)
                    .expect("Not compiled");

                let remote_tmp_path = format!(
                    "/tmp/.cindy-worker.{}",
                    rand::distr::SampleString::sample_string(&rand::distr::Alphanumeric, &mut rand::rng(), 16),
                );

                // Upload worker binary executable.
                //
                // stdout/stderr are piped (not inherited): if `sudo` refuses
                // — e.g. the host needs a password — its message must land in
                // this host's output pane, not on the real terminal where it
                // would corrupt the alternate-screen TUI. We forward the
                // upload's stderr the same way we do the run's.
                let mut upload = host
                    .session
                    .conditional_sudo_command(host.use_sudo, "sh")
                    .args(["-c", &format!("umask 0266 && cat >'{remote_tmp_path}' && chmod u+x '{remote_tmp_path}'")])
                    .stdin(Stdio::piped())
                    .stdout(Stdio::piped())
                    .stderr(Stdio::piped())
                    .spawn()
                    .await?;

                let upload_stderr = BufReader::new(upload.stderr().take().unwrap());
                tokio::spawn(forward_stderr(
                    upload_stderr,
                    progress_tx.clone(),
                    idx,
                    progress::Source::Worker,
                ));

                if let Some(mut stdin) = upload.stdin().take() {
                    // If the remote command died before we finished writing
                    // (e.g. `sudo -n` exited because a password was required),
                    // the write fails with a broken pipe. That error is just a
                    // symptom — the real cause is on the command's stderr,
                    // already forwarded to the pane — so don't surface it;
                    // fall through to `exit_ok`, which reports the actual
                    // non-zero exit.
                    let bin = std::fs::read(local_worker_path)?;
                    let _ = stdin.write_all(&bin).await;
                }
                upload.wait().await?.exit_ok()?;

                // The host context (name, tags, vars) and the vault DEKs
                // this host needs were both computed during preflight.
                let host_context_json = host_contexts
                    .get(&hostname)
                    .expect("preflight populated every host's context")
                    .clone();
                let vault_keys_json = serde_json::to_string(
                    host_vault_keys
                        .get(&hostname)
                        .expect("preflight populated every host's vault keys"),
                )
                .expect("Failed to serialise vault key map");

                // The worker gets *no* secret material in its argv/env:
                // its vault DEKs travel over the RPC handshake (see the
                // orchestrator below), so they never appear in the
                // target's process table. It also doesn't need the host
                // context — that's an orchestrator-only concern.
                let mut remote_run = host
                    .session
                    .conditional_sudo_command(host.use_sudo, &remote_tmp_path)
                    .stdin(Stdio::piped())
                    .stdout(Stdio::piped())
                    .stderr(Stdio::piped())
                    .spawn()
                    .await?;

                // The orchestrator runs locally (operator's machine), so
                // env is a fine channel for both its host context and the
                // vault DEKs. It installs the DEKs for its own
                // orchestrator-side reveals and forwards them to the
                // worker over the RPC handshake.
                let mut orchestrator_run = tokio::process::Command::new(&*orch_path)
                    .env("CINDY_HOST_CONTEXT", &host_context_json)
                    .env("CINDY_VAULT_KEYS", &vault_keys_json)
                    .stdin(std::process::Stdio::piped())
                    .stdout(std::process::Stdio::piped())
                    .stderr(std::process::Stdio::piped())
                    .spawn()?;

                let mut r_stdout = remote_run.stdout().take().unwrap();
                let mut r_stdin = remote_run.stdin().take().unwrap();
                let r_stderr = BufReader::new(remote_run.stderr().take().unwrap());

                let mut o_stdout = orchestrator_run.stdout.take().unwrap();
                let mut o_stdin = orchestrator_run.stdin.take().unwrap();
                let o_stderr = BufReader::new(orchestrator_run.stderr.take().unwrap());

                let bridge_c =
                    forward_stderr(r_stderr, progress_tx.clone(), idx, progress::Source::Worker);
                let bridge_d = forward_stderr(
                    o_stderr,
                    progress_tx.clone(),
                    idx,
                    progress::Source::Orchestrator,
                );

                tokio::spawn(bridge_c);
                tokio::spawn(bridge_d);

                // Pump the RPC pipe in both directions until it closes. We
                // only need this to *finish*; whether the run actually
                // succeeded is decided by the process exit codes below — a
                // worker panic closes its stdout and would otherwise look
                // just like a clean EOF here.
                tokio::select! {
                    biased; // needed to avoid random error prints in case of successful exit
                    res = tokio::io::copy(&mut r_stdout, &mut o_stdin) => {
                        tracing::debug!("Remote stdout -> Orchestrator stdin stream closed: {:?}", res);
                    }
                    res = async {
                        let _ = tokio::io::copy(&mut o_stdout, &mut r_stdin).await;
                        let _ = r_stdin.shutdown().await;
                    } => {
                        tracing::debug!("Orchestrator stdout -> Remote stdin stream closed: {:?}", res);
                    }
                }

                // The orchestrator holds the authoritative exit code: it runs
                // the user's play and exits 0 on success, 1 on a play error or
                // a (remote-or-local) panic. The worker, by contrast, stays
                // alive serving RPC and won't exit on its own, so we wait the
                // orchestrator and then tear the worker down.
                let orch_exit = orchestrator_run.wait().await?;
                // The worker stays alive serving RPC; closing the pipe to it
                // (by dropping the child, which we took its stdio out of) ends
                // its read loop so it exits.
                drop(remote_run);
                if !orch_exit.success() {
                    eyre::bail!("play failed (orchestrator exited with {orch_exit})");
                }

                tracing::info!("Execution successfully completed.");
                Ok::<(), eyre::Error>(())
            }
            .instrument(context_span)
            .await;

            // Report terminal status from the outcome. We don't propagate
            // the error here: aborting the join would tear down the TUI the
            // moment one host fails. Failures are surfaced in the view and
            // summarised after it closes.
            //
            // An error raised in the deployment closure itself (SSH upload,
            // spawn, `exit_ok`, or the orchestrator exiting non-zero) carries
            // its message in the `eyre::Error` value, *not* on any child's
            // stderr — so unless we push it into the host's output pane here,
            // a host can flip to ✗ with an empty pane and no stated reason.
            // Emit the full cause chain as orchestrator-sourced lines.
            let status = if let Err(e) = &result {
                for line in format!("{e:?}").lines() {
                    let _ = progress_status.send(progress::Msg::Line(
                        idx,
                        progress::Source::Orchestrator,
                        line.to_owned(),
                    ));
                }
                progress::Status::Failed
            } else {
                progress::Status::Finished
            };
            let _ = progress_status.send(progress::Msg::Status(idx, status));
            result
        }
    });

    // Materialise the futures (each captures its own `progress_tx` clone),
    // then drop our keeper handle so the view's channel can close once every
    // host future has finished. The view itself is owned by `main`, which
    // awaits it after we return (so the terminal is always restored).
    let deployment_futures: Vec<_> = deployment_futures.collect();
    drop(progress_tx);

    let results = futures_util::future::join_all(deployment_futures).await;

    let failed = results.iter().filter(|r| r.is_err()).count();
    if failed > 0 {
        // The per-host failures (including any panic output) were already
        // shown in the run view; report a one-line summary as the error so
        // `main` can tear the view down first, then exit non-zero without an
        // eyre backtrace.
        eyre::bail!(RunFailed(failed));
    }
    Ok(())
}

/// Sentinel error for "N hosts failed". Carries no backtrace-worthy context —
/// the real per-host failures were already shown in the run view — so `main`
/// can render a single red summary line and exit non-zero.
#[derive(Debug)]
struct RunFailed(usize);

impl std::fmt::Display for RunFailed {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{} host(s) failed", self.0)
    }
}

impl std::error::Error for RunFailed {}

/// Ask the orchestrator binary which vaults its `secret!` invocations
/// reference (the "secrets in code" half of the preflight), by running
/// it in `CINDY_DUMP_VAULTS` mode and parsing the JSON array it prints.
async fn dump_in_code_vaults(orchestrator_exe_path: &Path) -> eyre::Result<Vec<String>> {
    let out = tokio::process::Command::new(orchestrator_exe_path)
        .env("CINDY_DUMP_VAULTS", "1")
        .stdin(std::process::Stdio::null())
        .stdout(std::process::Stdio::piped())
        .stderr(std::process::Stdio::piped())
        .output()
        .await?;
    if !out.status.success() {
        eyre::bail!(
            "vault enumeration exited with {:?}\n{}",
            out.status,
            String::from_utf8_lossy(&out.stderr)
        );
    }
    serde_json::from_slice(&out.stdout).context("Failed to parse vault list from orchestrator")
}

/// Read `keys/<vault>.dek` for each needed vault and build the
/// `{vault: base64-DEK}` map the worker/orchestrator expect in
/// `CINDY_VAULT_KEYS`. Returns the names of any vault whose key file is
/// missing or unreadable in `missing`, so the caller can fail the whole
/// run up front with full visibility rather than mid-play.
fn collect_vault_keys(
    needed: &std::collections::BTreeSet<String>,
) -> (HashMap<String, String>, Vec<String>) {
    use base64::Engine as _;

    let mut map = HashMap::new();
    let mut missing = Vec::new();
    for vault in needed {
        match std::fs::read(cindy::secret::keychain::dek_path(vault)) {
            Ok(bytes) => {
                let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
                map.insert(vault.clone(), b64);
            }
            Err(_) => missing.push(vault.clone()),
        }
    }
    (map, missing)
}

/// Compute the vaults a single host needs: the in-code `secret!` vaults
/// (same for every host) unioned with the sealed-secret vaults found in
/// that host's serialised context.
fn host_needed_vaults(
    in_code: &[String],
    host_context_json: &str,
) -> std::collections::BTreeSet<String> {
    let mut needed: std::collections::BTreeSet<String> = in_code.iter().cloned().collect();
    if let Ok(value) = serde_json::from_str::<serde_json::Value>(host_context_json) {
        cindy::inventory::collect_sealed_vaults(&value, &mut needed);
    }
    needed
}

async fn compile_orchestrator(workspace_root: &Path) -> eyre::Result<PathBuf> {
    tracing::info!("compiling orchestrator binary");
    let target_dir = workspace_root.join("target/cindy/orchestrator");
    let output = tokio::process::Command::new("cargo")
        .args([
            "build",
            "--release",
            "--message-format=json",
            "--no-default-features",
            "--features",
            "orchestrator",
            "--target-dir",
        ])
        .arg(&target_dir)
        .output()
        .await?;

    if !output.status.success() {
        eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
    }

    let stdout_str = String::from_utf8(output.stdout)?;
    let exe_path = stdout_str
        .lines()
        .filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
        .filter_map(|json| json["executable"].as_str().map(PathBuf::from))
        .next_back()
        .context("Failed to extract orchestrator executable path")?;
    Ok(exe_path)
}

#[tracing::instrument(skip_all, fields(triplet = platform.target_triple))]
async fn compile_remote_worker(
    platform: &'static platforms::Platform,
    metadata: &cargo_metadata::Metadata,
    workspace_root: &Path,
) -> eyre::Result<PathBuf> {
    tracing::info!(platform.target_triple, "compiling remote binary");
    // Absolute so the output lands under the workspace regardless of cwd, and
    // so we can reconstruct the binary path from it below without relying on
    // cargo's container-relative `executable` field.
    let target_dir = workspace_root.join(format!("target/cindy/remote/{}", platform.target_triple));
    let mut cmd = tokio::process::Command::new("cross");
    cmd.args([
        "build",
        "--release",
        "--message-format=json",
        "--target",
        platform.target_triple,
        "--no-default-features",
        "--features",
        "remote",
        "--target-dir",
    ])
    .arg(&target_dir);

    if matches!(platform.tier, platforms::Tier::Three) {
        tracing::warn!("tier 3 target, building with `-Zbuild-std`");
        cmd.arg("-Zbuild-std");
    }

    if let Some(pkg) = metadata.packages.iter().find(|p| p.name == "cindy")
        && pkg.source.is_none()
        && let Some(pkg_dir) = pkg.manifest_path.parent()
    {
        let metadata = cargo_metadata::MetadataCommand::new()
            .current_dir(pkg_dir)
            .exec()?;
        let framework_workspace = metadata.workspace_root;

        cmd.env(
            "CROSS_CONTAINER_OPTS",
            format!("-v {framework_workspace}:{framework_workspace}"),
        );
    }

    let output = cmd.output().await?;
    if !output.status.success() {
        eyre::bail!("{}", String::from_utf8(output.stderr).unwrap())
    }
    // `cross` runs cargo inside a container, so the `executable` it reports is
    // a container path that may not exist on the host. We only trust it for
    // the binary's *file name*, then rebase that onto our own absolute
    // target-dir (mounted into the container at the same path), giving a host
    // path we can actually read.
    let stdout_str = String::from_utf8(output.stdout)?;
    let exe_name = stdout_str
        .lines()
        .filter_map(|l| serde_json::from_str::<serde_json::Value>(l).ok())
        .filter_map(|json| json["executable"].as_str().map(PathBuf::from))
        .next_back()
        .context("Failed to extract remote executable path")?
        .file_name()
        .context("remote executable path had no file name")?
        .to_owned();

    let exe_path = target_dir
        .join(platform.target_triple)
        .join("release")
        .join(exe_name);

    Ok(exe_path)
}