snapdir-cli 1.4.0

snapdir CLI: the `snapdir` binary exposing all subcommands.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
//! Stateful end-to-end CLI tests using `assert_cmd` + `assert_fs`.
//!
//! Unlike the static `trycmd` surface snapshots, these drive the *wired*
//! commands against real temp trees and a temp `file://` store, asserting real
//! behavior:
//!
//! - `manifest` / `id` over a known tiny tree: the id is 64 lowercase hex. (The
//!   frozen byte-format contract is pinned separately by
//!   `crates/snapdir-core/tests/compat_golden.rs` against recorded constants.)
//! - a `push -> fetch -> checkout` and `push -> pull` round-trip over a temp
//!   `file://` store: the printed id equals the source id, the checked-out tree
//!   re-manifests to the same id (contents + permissions reproduced), and
//!   `verify` accepts the intact snapshot.
//!
//! The store/cache live under `assert_fs` temp dirs that are removed on drop, so
//! these tests are hermetic and need no network or credentials.

use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::Command;

use assert_cmd::prelude::*;
use assert_fs::prelude::*;
use assert_fs::TempDir;
use predicates::prelude::*;

/// A fresh `snapdir` command with the cache pinned under `cache` so tests never
/// touch the user's real `$HOME/.cache/snapdir`.
fn snapdir(cache: &Path) -> Command {
    let mut cmd = Command::cargo_bin("snapdir").expect("snapdir binary built");
    cmd.env("SNAPDIR_CACHE_DIR", cache);
    cmd
}

/// Builds a known tiny tree with explicit, deterministic permissions so a
/// checked-out copy must restore them to re-manifest to the same id.
fn build_tree(dir: &TempDir) {
    dir.child("a.txt").write_str("hello").unwrap();
    std::fs::set_permissions(dir.child("a.txt").path(), PermissionsExt::from_mode(0o644)).unwrap();
    dir.child("sub/b.txt").write_str("world!!").unwrap();
    std::fs::set_permissions(
        dir.child("sub/b.txt").path(),
        PermissionsExt::from_mode(0o600),
    )
    .unwrap();
    std::fs::set_permissions(dir.child("sub").path(), PermissionsExt::from_mode(0o755)).unwrap();
    std::fs::set_permissions(dir.path(), PermissionsExt::from_mode(0o755)).unwrap();
}

/// Runs `snapdir <args>` (cache pinned), asserts success, returns trimmed stdout.
fn stdout_ok(cache: &Path, args: &[&str]) -> String {
    let out = snapdir(cache).args(args).output().expect("run snapdir");
    assert!(
        out.status.success(),
        "snapdir {args:?} failed ({:?})\nstderr: {}",
        out.status.code(),
        String::from_utf8_lossy(&out.stderr),
    );
    String::from_utf8(out.stdout).unwrap().trim_end().to_owned()
}

#[test]
fn id_is_64_lowercase_hex() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    build_tree(&src);
    let src_str = src.path().to_string_lossy().into_owned();

    let id = stdout_ok(cache.path(), &["id", &src_str]);
    assert_eq!(id.len(), 64, "snapshot id must be 64 hex chars: {id:?}");
    assert!(
        id.chars()
            .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()),
        "snapshot id must be lowercase hex: {id:?}"
    );
}

#[test]
fn push_fetch_checkout_roundtrip_reproduces_id() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    // id is store-independent; capture it for later equality checks.
    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    // push prints the source id.
    let pushed = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);
    assert_eq!(pushed, src_id, "push must print the source snapshot id");

    // fetch populates the cache (offline checkout works from the cache only).
    snapdir(cache.path())
        .args(["fetch", "--store", &store_url, "--id", &src_id])
        .assert()
        .success();

    // checkout materializes the tree (no --store needed; reads the cache).
    snapdir(cache.path())
        .args(["checkout", "--id", &src_id, &dest_str])
        .assert()
        .success();

    // The destination reproduces the source contents...
    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    // ...and re-manifests to the SAME id (contents + permissions restored).
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "checked-out tree must re-manifest to the source id"
    );

    // verify accepts the intact snapshot in the store.
    snapdir(cache.path())
        .args(["verify", "--store", &store_url, "--id", &src_id])
        .assert()
        .success();
}

/// `verify --purge` must be rejected: the global `--purge` flag is inert on
/// `verify` (a store-based integrity check that never touches the cache), so
/// rather than silently ignore it the command bails with an actionable message
/// pointing at `verify-cache --purge`. The rejection fires before any store
/// resolution, so a bogus store/id still surfaces the purge error.
#[test]
fn verify_purge_is_rejected() {
    let cache = TempDir::new().unwrap();
    let zeros = "0".repeat(64);

    snapdir(cache.path())
        .args([
            "verify",
            "--store",
            "file:///tmp/nonexistent-snapdir-verify-purge",
            "--id",
            &zeros,
            "--purge",
        ])
        .assert()
        .failure()
        .stderr(predicate::str::contains("verify").and(predicate::str::contains("--purge")));
}

/// Sanity: plain `verify` (no `--purge`) does NOT hit the purge rejection. It
/// still fails here (the manifest is missing from the bogus store), but the
/// failure must not be the purge message.
#[test]
fn verify_without_purge_does_not_hit_purge_error() {
    let cache = TempDir::new().unwrap();
    let zeros = "0".repeat(64);

    snapdir(cache.path())
        .args([
            "verify",
            "--store",
            "file:///tmp/nonexistent-snapdir-verify-purge",
            "--id",
            &zeros,
        ])
        .assert()
        .failure()
        .stderr(predicate::str::contains("does not support --purge").not());
}

#[test]
fn pull_is_fetch_plus_checkout() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);

    // pull == fetch + checkout in one step.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();

    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    assert_eq!(stdout_ok(cache.path(), &["id", &dest_str]), src_id);
}

/// The transfer-tuning flags (`--jobs` / `--limit-rate`) are accepted on a real
/// `push` then `pull` round-trip to a `file://` store and do not change the
/// outcome: the pushed id equals the source id and the pulled tree re-manifests
/// to it. This exercises the full flag → `TransferConfig` → store threading.
#[test]
fn transfer_flags_push_pull_roundtrip() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    // push with concurrency + bandwidth caps set explicitly.
    let pushed = stdout_ok(
        cache.path(),
        &[
            "push",
            "--store",
            &store_url,
            "--jobs",
            "2",
            "--limit-rate",
            "1M",
            &src_str,
        ],
    );
    assert_eq!(pushed, src_id, "push with transfer flags must print the id");

    // pull with the short `-j` alias and a sequential cap.
    snapdir(cache.path())
        .args([
            "pull",
            "--store",
            &store_url,
            "--id",
            &src_id,
            "-j",
            "1",
            "--limit-rate",
            "512K",
            &dest_str,
        ])
        .assert()
        .success();

    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "tree pulled with transfer flags must re-manifest to the source id"
    );
}

/// A repeat `pull`/`fetch` of an already-cached id must perform ZERO store
/// object reads: the cache holds the manifest, and the manifest-written-last
/// invariant means it holds every referenced object too. We prove "no store
/// reads" the only honest way — by *deleting the store's `.objects` subtree*
/// (keeping `.manifests`) after the first pull, then pulling the SAME id again.
/// If the fetch leg still materialized objects from the store, the second pull
/// would fail with `object not found`; instead it must succeed, and its
/// destination must re-manifest to the same id (correctness, not a silent skip).
#[test]
fn fetch_cached_skips_store_objects() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    let dest2 = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let redest_str = dest2.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    // push, then pull #1 — populates BOTH the cache and the first destination.
    let src_id = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();
    assert_eq!(stdout_ok(cache.path(), &["id", &dest_str]), src_id);

    // Amputate the store's objects (keep the manifest). Any store object read
    // now fails — so a fetch that hits the store cannot succeed.
    let objects = store.path().join(".objects");
    assert!(objects.exists(), "store must have an .objects subtree");
    std::fs::remove_dir_all(&objects).expect("remove store .objects subtree");
    assert!(store.path().join(".manifests").exists(), "manifest kept");

    // pull #2 of the SAME id into a fresh destination must SUCCEED purely from
    // the cache (zero store object reads), and re-manifest to the same id.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &redest_str])
        .assert()
        .success();
    dest2.child("a.txt").assert("hello");
    dest2.child("sub/b.txt").assert("world!!");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &redest_str]),
        src_id,
        "cache-served pull must reproduce the source id"
    );

    // A bare `fetch` of the cached id is likewise a no-op success.
    snapdir(cache.path())
        .args(["fetch", "--store", &store_url, "--id", &src_id])
        .assert()
        .success();
}

#[test]
fn fetch_without_store_fails_with_clear_message() {
    let cache = TempDir::new().unwrap();
    snapdir(cache.path())
        .args(["fetch", "--id", &"0".repeat(64)])
        .assert()
        .failure()
        .stderr(predicate::str::contains("missing --store option"));
}

#[test]
fn checkout_unknown_id_fails() {
    let cache = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    let dest_str = dest.path().to_string_lossy().into_owned();
    // Nothing fetched into this cache, so the manifest is absent.
    snapdir(cache.path())
        .args(["checkout", "--id", &"0".repeat(64), &dest_str])
        .assert()
        .failure();
}

// ---------------------------------------------------------------------------
// pull-push-correctness-suite: binary-level regression tests for the four
// correctness properties the operator was worried about. These build on top of
// the existing round-trip tests above; each is hermetic (temp `file://` store +
// temp cache, both removed on drop).
// ---------------------------------------------------------------------------

/// Recursively counts the regular files anywhere under `dir` (0 if absent). A
/// content-addressable store/cache/destination materializes its state as files,
/// so a count of zero means nothing was written.
fn count_files(dir: &Path) -> usize {
    let mut total = 0;
    if !dir.exists() {
        return 0;
    }
    for entry in std::fs::read_dir(dir).expect("read_dir") {
        let path = entry.expect("dir entry").path();
        if path.is_dir() {
            total += count_files(&path);
        } else {
            total += 1;
        }
    }
    total
}

/// Scenario 1 — **push → pull → pull idempotency.** Pulling the same id into
/// the SAME destination twice must be a stable no-op: both pulls exit 0, and
/// after each the destination re-manifests to the source id (contents +
/// permissions intact). This complements `fetch_cached_skips_store_objects`
/// (which proves the 2nd pull needs no store objects); here we focus on the
/// positive idempotency / destination stability across repeated pulls.
#[test]
fn push_pull_pull_is_idempotent() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);

    // Pull #1 into the destination.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();
    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "first pull must reproduce the source id"
    );

    // Pull #2 into the SAME destination — must be a stable, idempotent no-op.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();
    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "repeated pull must leave the destination re-manifesting to the same id"
    );
}

/// Scenario 2 — **--dryrun makes no writes (e2e level).** A `push --dryrun`
/// against an empty `file://` store must leave the store empty (no `.objects`,
/// no `.manifests`), and a `pull --dryrun` into a fresh destination must leave
/// that destination empty. Intentionally overlaps `tests/dryrun.rs` so THIS
/// gate's verification — which runs only e2e + `store_roundtrip` — exercises the
/// dry-run invariant.
#[test]
fn dryrun_push_leaves_store_empty_e2e() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    // push --dryrun: still prints the (pure-computation) id, writes nothing.
    let id = stdout_ok(
        cache.path(),
        &["push", "--dryrun", "--store", &store_url, &src_str],
    );
    assert_eq!(id.len(), 64, "push --dryrun must still print the id");
    assert!(
        !store.path().join(".objects").exists(),
        "push --dryrun must not create any store objects"
    );
    assert!(
        !store.path().join(".manifests").exists(),
        "push --dryrun must not create any store manifests"
    );
    assert_eq!(
        count_files(store.path()),
        0,
        "store must remain empty after push --dryrun"
    );

    // A real push so a dryrun pull has something to (not) materialize.
    let realstore = TempDir::new().unwrap();
    let real_url = format!("file://{}", realstore.path().display());
    let pushcache = TempDir::new().unwrap();
    let real_id = stdout_ok(pushcache.path(), &["push", "--store", &real_url, &src_str]);

    // pull --dryrun into a fresh dest + fresh cache must leave both empty.
    let dest = TempDir::new().unwrap();
    let pullcache = TempDir::new().unwrap();
    let dest_str = dest.path().to_string_lossy().into_owned();
    snapdir(pullcache.path())
        .args([
            "pull", "--dryrun", "--store", &real_url, "--id", &real_id, &dest_str,
        ])
        .assert()
        .success();
    assert_eq!(
        count_files(dest.path()),
        0,
        "pull --dryrun must not materialize any destination files"
    );
    assert_eq!(
        count_files(pullcache.path()),
        0,
        "pull --dryrun must not write to the cache"
    );
}

/// Scenario 3 — **corrupted local file is detected and repaired on pull.**
/// After a push + pull populates the cache and the destination, overwrite one
/// destination file with wrong bytes. A second pull into the SAME destination
/// must repair it: the destination re-manifests to the source id and the
/// corrupted file's contents are restored. The 2nd pull's fetch leg is a
/// cache-hit no-op (manifest already cached); the checkout leg notices the
/// corrupt file's local checksum no longer matches and rewrites it from the
/// cache — so the repair works offline. We prove offline by amputating the
/// store's `.objects` before the repairing pull.
#[test]
fn pull_repairs_corrupted_dest_file() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);

    // Pull #1 populates the cache and the destination.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();
    dest.child("a.txt").assert("hello");
    assert_eq!(stdout_ok(cache.path(), &["id", &dest_str]), src_id);

    // Corrupt a destination file in place (wrong bytes, wrong length).
    dest.child("a.txt")
        .write_str("CORRUPTED-WRONG-BYTES")
        .unwrap();
    assert_ne!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "corrupting the file must change the re-manifested id"
    );

    // Amputate the store's objects to prove the repair is served from the
    // cache (offline) and not by re-reading the store.
    let objects = store.path().join(".objects");
    assert!(objects.exists(), "store must have an .objects subtree");
    std::fs::remove_dir_all(&objects).expect("remove store .objects subtree");

    // Pull #2 into the SAME destination must repair the corrupted file.
    snapdir(cache.path())
        .args(["pull", "--store", &store_url, "--id", &src_id, &dest_str])
        .assert()
        .success();
    dest.child("a.txt").assert("hello");
    dest.child("sub/b.txt").assert("world!!");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "repairing pull must restore the destination to the source id"
    );
}

// ---------------------------------------------------------------------------
// transfer-concurrency-verification (phase 13): end-to-end proof that
// `--jobs` / `--limit-rate` are wired through the binary and that concurrency
// does not change the materialized result. All hermetic via a temp `file://`
// store (the aggregate `RateLimiter` is network-only; `FileStore` does NOT throttle
// local copies, so the deterministic throttle proof lives in transfer-config's
// `RateLimiter` timing unit test, not here). These tests prove flag acceptance,
// threading, and byte-identical correctness through the concurrent `FileStore`
// path. Fn names start with `transfer_concurrency` so
// `cargo test -p snapdir-cli --locked transfer_concurrency` selects them.
// ---------------------------------------------------------------------------

/// Builds a *multi-file* tree (several files + nested dirs) with deterministic
/// permissions so a concurrent push/pull has real fan-out to exercise, and a
/// checked-out copy must restore contents + perms to re-manifest to the same id.
fn build_multi_tree(dir: &TempDir) {
    let files = [
        ("top1.txt", "alpha", 0o644),
        ("top2.bin", "bravo-bravo", 0o600),
        ("dir_a/a1.txt", "charlie", 0o644),
        ("dir_a/a2.txt", "delta-delta-delta", 0o640),
        ("dir_a/nested/deep.txt", "echo!!", 0o644),
        ("dir_b/b1.txt", "foxtrot", 0o600),
        ("dir_b/b2.txt", "golf", 0o644),
        ("dir_b/sub/c/leaf.dat", "hotel-hotel", 0o644),
    ];
    for (rel, body, mode) in files {
        dir.child(rel).write_str(body).unwrap();
        std::fs::set_permissions(dir.child(rel).path(), PermissionsExt::from_mode(mode)).unwrap();
    }
    // Pin a couple of directory modes so directory perms are part of the id too.
    for d in ["dir_a", "dir_a/nested", "dir_b", "dir_b/sub", "dir_b/sub/c"] {
        std::fs::set_permissions(dir.child(d).path(), PermissionsExt::from_mode(0o755)).unwrap();
    }
    std::fs::set_permissions(dir.path(), PermissionsExt::from_mode(0o755)).unwrap();
}

/// `push --jobs 4` a multi-file tree to a `file://` store, then `pull --jobs 4`
/// into a fresh dest: exit 0, the pushed id equals the source id, and the pulled
/// tree re-manifests to the same id (byte-identical materialization through the
/// concurrent `FileStore` path).
#[test]
fn transfer_concurrency_jobs4_roundtrip() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_multi_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    let pushed = stdout_ok(
        cache.path(),
        &["push", "--store", &store_url, "--jobs", "4", &src_str],
    );
    assert_eq!(pushed, src_id, "push --jobs 4 must print the source id");

    snapdir(cache.path())
        .args([
            "pull", "--store", &store_url, "--id", &src_id, "--jobs", "4", &dest_str,
        ])
        .assert()
        .success();

    dest.child("dir_a/nested/deep.txt").assert("echo!!");
    dest.child("dir_b/sub/c/leaf.dat").assert("hotel-hotel");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "tree pulled with --jobs 4 must re-manifest to the source id"
    );
}

/// Same round-trip with `--jobs 1` (the sequential path): identical id/result.
/// Also asserts the `--jobs 1` and `--jobs 4` runs produce the SAME snapshot id,
/// proving concurrency does not change the output.
#[test]
fn transfer_concurrency_jobs1_roundtrip() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_multi_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    let pushed = stdout_ok(
        cache.path(),
        &["push", "--store", &store_url, "--jobs", "1", &src_str],
    );
    assert_eq!(pushed, src_id, "push --jobs 1 must print the source id");

    snapdir(cache.path())
        .args([
            "pull", "--store", &store_url, "--id", &src_id, "--jobs", "1", &dest_str,
        ])
        .assert()
        .success();

    dest.child("dir_a/a2.txt").assert("delta-delta-delta");
    dest.child("dir_b/b1.txt").assert("foxtrot");
    let dest_id = stdout_ok(cache.path(), &["id", &dest_str]);
    assert_eq!(
        dest_id, src_id,
        "tree pulled with --jobs 1 must re-manifest to the source id"
    );

    // Push the same tree to a SECOND store with --jobs 4; the printed id must
    // match the --jobs 1 push — concurrency must not change the snapshot id.
    let parallel_store = TempDir::new().unwrap();
    let parallel_url = format!("file://{}", parallel_store.path().display());
    let pushed4 = stdout_ok(
        cache.path(),
        &["push", "--store", &parallel_url, "--jobs", "4", &src_str],
    );
    assert_eq!(
        pushed4, pushed,
        "--jobs 4 and --jobs 1 pushes must yield the same snapshot id"
    );
}

/// `push --jobs 2 --limit-rate 1M` + `pull --limit-rate 512K` round-trip to a
/// `file://` store succeeds and re-manifests to the same id. This proves the
/// flag parses, threads into `TransferConfig`, and does not break correctness.
/// There is NO timing assertion: `FileStore` does not throttle local copies (the
/// `RateLimiter` is network-only; its deterministic timing proof lives in
/// transfer-config's `RateLimiter` unit test).
#[test]
fn transfer_concurrency_limit_rate_accepted() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_multi_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    let pushed = stdout_ok(
        cache.path(),
        &[
            "push",
            "--store",
            &store_url,
            "--jobs",
            "2",
            "--limit-rate",
            "1M",
            &src_str,
        ],
    );
    assert_eq!(
        pushed, src_id,
        "push --jobs 2 --limit-rate 1M must print the source id"
    );

    snapdir(cache.path())
        .args([
            "pull",
            "--store",
            &store_url,
            "--id",
            &src_id,
            "--limit-rate",
            "512K",
            &dest_str,
        ])
        .assert()
        .success();

    dest.child("top1.txt").assert("alpha");
    dest.child("dir_b/sub/c/leaf.dat").assert("hotel-hotel");
    assert_eq!(
        stdout_ok(cache.path(), &["id", &dest_str]),
        src_id,
        "tree pulled with --limit-rate must re-manifest to the source id"
    );
}

/// Scenario 4 — **multi/comma --exclude drops paths from the manifest.** Build
/// a tree with excludable `node_modules/x` and `coverage/y`. The comma form
/// (`--exclude node_modules,coverage`) and the repeated form (`--exclude
/// node_modules --exclude coverage`) must both omit those paths from the
/// manifest stdout AND produce identical output, while a plain `manifest` (no
/// exclude) includes them.
///
/// (The exclude regex matches against the *absolute* scan path, so the chosen
/// exclude tokens must not appear in the temp-dir prefix — e.g. `tmp` would
/// also match `/tmp/...` and nuke the whole walk. `node_modules`/`coverage` are
/// safe distinctive names.)
#[test]
fn manifest_multi_exclude_drops_paths_e2e() {
    let src = TempDir::new().unwrap();
    build_tree(&src);
    // Add excludable subtrees on top of the known tree.
    src.child("node_modules/x").write_str("dep").unwrap();
    src.child("coverage/y").write_str("scratch").unwrap();
    let src_str = src.path().to_string_lossy().into_owned();

    // The cache dir is irrelevant for `manifest` (pure computation) but the
    // harness pins it; reuse a temp cache so we never touch the real $HOME.
    let cache = TempDir::new().unwrap();

    // Plain manifest includes both excludable subtrees.
    let plain = stdout_ok(cache.path(), &["manifest", &src_str]);
    assert!(
        plain.contains("node_modules"),
        "plain manifest should include node_modules:\n{plain}"
    );
    assert!(
        plain.contains("coverage"),
        "plain manifest should include coverage:\n{plain}"
    );

    // Comma form and repeated form.
    let comma = stdout_ok(
        cache.path(),
        &["manifest", "--exclude", "node_modules,coverage", &src_str],
    );
    let repeated = stdout_ok(
        cache.path(),
        &[
            "manifest",
            "--exclude",
            "node_modules",
            "--exclude",
            "coverage",
            &src_str,
        ],
    );

    for (label, out) in [("comma", &comma), ("repeated", &repeated)] {
        assert!(
            !out.contains("node_modules"),
            "{label} --exclude must drop node_modules:\n{out}"
        );
        assert!(
            !out.contains("coverage"),
            "{label} --exclude must drop coverage:\n{out}"
        );
        // The non-excluded known tree files must survive.
        assert!(
            out.contains("./a.txt") && out.contains("./sub/b.txt"),
            "{label} --exclude must keep the non-excluded files:\n{out}"
        );
    }

    assert_eq!(
        comma, repeated,
        "comma and repeated --exclude forms must produce identical manifests"
    );
}

/// `--verbose push --jobs N` prints the effective transfer concurrency to
/// stderr ONCE, while stdout stays exactly the snapshot id (byte-stable).
#[test]
fn verbose_jobs_push_reports_concurrency() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    let out = snapdir(cache.path())
        .args([
            "push",
            "--jobs",
            "3",
            "--verbose",
            "--store",
            &store_url,
            &src_str,
        ])
        .output()
        .expect("run snapdir");
    assert!(out.status.success(), "verbose push must succeed");

    let stdout = String::from_utf8(out.stdout).unwrap();
    assert_eq!(
        stdout.trim_end(),
        src_id,
        "stdout must remain exactly the snapshot id"
    );

    let stderr = String::from_utf8(out.stderr).unwrap();
    assert!(
        stderr.contains("transfers: 3 concurrent"),
        "stderr must report effective concurrency:\n{stderr}"
    );
    assert_eq!(
        stderr.matches("transfers:").count(),
        1,
        "the transfer-config banner must print exactly once:\n{stderr}"
    );
}

/// `--verbose push --jobs N --limit-rate R` reports both the concurrency and the
/// rate limit on stderr.
#[test]
fn verbose_jobs_limit_rate_reported() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let out = snapdir(cache.path())
        .args([
            "push",
            "--jobs",
            "2",
            "--limit-rate",
            "1M",
            "--verbose",
            "--store",
            &store_url,
            &src_str,
        ])
        .output()
        .expect("run snapdir");
    assert!(out.status.success(), "verbose limited push must succeed");

    let stderr = String::from_utf8(out.stderr).unwrap();
    assert!(
        stderr.contains("2 concurrent") && stderr.contains("limit 1M"),
        "stderr must report concurrency AND the limit rate:\n{stderr}"
    );
}

/// Without `--verbose`, the transfer-config banner is silent and stdout is still
/// exactly the snapshot id.
#[test]
fn verbose_jobs_silent_without_verbose() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["id", &src_str]);

    let out = snapdir(cache.path())
        .args(["push", "--jobs", "3", "--store", &store_url, &src_str])
        .output()
        .expect("run snapdir");
    assert!(out.status.success(), "non-verbose push must succeed");

    let stdout = String::from_utf8(out.stdout).unwrap();
    assert_eq!(stdout.trim_end(), src_id, "stdout must be the snapshot id");

    let stderr = String::from_utf8(out.stderr).unwrap();
    assert!(
        !stderr.contains("concurrent") && !stderr.contains("transfers:"),
        "non-verbose run must not emit the transfer-config banner:\n{stderr}"
    );
}

/// `pull --jobs N --verbose` (fetch + checkout) emits the transfer-config banner
/// exactly ONCE, not once per leg.
#[test]
fn verbose_jobs_pull_reports_once() {
    let cache = TempDir::new().unwrap();
    let src = TempDir::new().unwrap();
    let store = TempDir::new().unwrap();
    let dest = TempDir::new().unwrap();
    build_tree(&src);

    let src_str = src.path().to_string_lossy().into_owned();
    let dest_str = dest.path().to_string_lossy().into_owned();
    let store_url = format!("file://{}", store.path().display());

    let src_id = stdout_ok(cache.path(), &["push", "--store", &store_url, &src_str]);

    let out = snapdir(cache.path())
        .args([
            "pull",
            "--jobs",
            "4",
            "--verbose",
            "--store",
            &store_url,
            "--id",
            &src_id,
            &dest_str,
        ])
        .output()
        .expect("run snapdir");
    assert!(out.status.success(), "verbose pull must succeed");

    let stderr = String::from_utf8(out.stderr).unwrap();
    assert!(
        stderr.contains("transfers: 4 concurrent"),
        "pull --verbose must report concurrency:\n{stderr}"
    );
    assert_eq!(
        stderr.matches("transfers:").count(),
        1,
        "pull must print the transfer-config banner exactly once:\n{stderr}"
    );
}