1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
use super::critical_path::is_likely_native_build;
use super::git_prepare::{prepare_scratch_copy, run_git_dep_prepare};
use super::lifecycle::run_import_on_blocking;
use super::settings::{
default_lockfile_network_concurrency, resolve_network_concurrency,
resolve_strict_store_integrity, resolve_strict_store_pkg_content_check,
resolve_verify_store_integrity,
};
use crate::commands::{resolve_virtual_store_dir, resolve_virtual_store_dir_max_length};
use crate::progress::InstallProgress;
use aube_lockfile::dep_path_filename::dep_path_to_filename;
use miette::{Context, IntoDiagnostic, miette};
use rayon::prelude::*;
use std::collections::BTreeMap;
/// Materialize a `file:` / `link:` package into the store.
///
/// `Directory` walks the target and hash-imports every file; `Tarball`
/// opens the `.tgz` and reuses the normal tarball importer. `Link`
/// returns `None` because link deps never have a store-backed index —
/// the linker symlinks directly to the target in step 2.
#[allow(clippy::too_many_arguments)]
pub(super) async fn import_local_source(
store: &std::sync::Arc<aube_store::Store>,
project_root: &std::path::Path,
local: &aube_lockfile::LocalSource,
client: Option<&std::sync::Arc<aube_registry::client::RegistryClient>>,
ignore_scripts: bool,
git_prepare_depth: u32,
inherited_build_policy: Option<std::sync::Arc<aube_scripts::BuildPolicy>>,
git_shallow_hosts: &[String],
pkg_name: &str,
pkg_version: &str,
) -> miette::Result<Option<aube_store::PackageIndex>> {
// `chain` is appended to per-error messages below so users see
// *why* a `file:` / `link:` / git / remote-tarball dep was pulled
// in. Empty when the package isn't in the resolved chain index
// (e.g. when the install pipeline hasn't seeded one yet for an
// out-of-band caller).
let chain = crate::dep_chain::format_chain_for(pkg_name, pkg_version);
use aube_lockfile::LocalSource;
match local {
LocalSource::Link(_) => Ok(None),
LocalSource::Directory(rel) => {
let abs = project_root.join(rel);
if !abs.is_dir() {
return Err(miette!(
"local dependency {}: {} is not a directory{chain}",
local.specifier(),
abs.display()
));
}
let index = store
.import_directory(&abs)
.map_err(|e| miette!("failed to import {}: {e}{chain}", local.specifier()))?;
Ok(Some(index))
}
LocalSource::Tarball(rel) => {
let abs = project_root.join(rel);
let bytes = std::fs::read(&abs)
.into_diagnostic()
.wrap_err_with(|| format!("read {}{chain}", abs.display()))?;
let index = store
.import_tarball(&bytes)
.map_err(|e| miette!("failed to import {}: {e}{chain}", local.specifier()))?;
Ok(Some(index))
}
LocalSource::Git(g) => {
// Materialize the git dep into a commit-keyed cache
// directory and hardlink-import into the store exactly
// like a `file:` directory. The resolver already pinned
// `g.resolved` to a full commit SHA, so we route through
// the same hosted-tarball-then-clone fallback npm and
// pnpm use:
//
// 1. github / gitlab / bitbucket public reads → a flat
// HTTPS tarball over codeload (no `git` binary, no
// SSH key required).
// 2. Anything else, plus codeload errors → shallow
// `git clone` over HTTPS (rewritten from the stored
// lockfile URL when the host is hosted, so an
// `git+ssh://git@github.com/…` lockfile still works
// on a host with no SSH key).
// 3. Non-hosted hosts → unchanged: clone whatever URL
// the lockfile recorded, preserving SSH-only setups.
//
// Both the codeload extract and the clone share the
// `(url, commit)` cache so the resolver's earlier call
// for the same dep doesn't pay the network round-trip
// twice.
let url = g.url.clone();
let resolved = g.resolved.clone();
let spec = local.specifier();
let hosted = aube_lockfile::parse_hosted_git(&url);
let runtime_url = hosted
.as_ref()
.map(|h| h.https_url())
.unwrap_or_else(|| url.clone());
let codeload_url = hosted.as_ref().and_then(|h| h.tarball_url(&resolved));
// Cache hit fast path: skip the HTTPS round-trip when the
// resolver already populated the codeload cache for this
// (url, commit) pair earlier in the install. Mirrors
// `git_shallow_clone`'s top-of-function reuse check.
let mut clone_dir: Option<std::path::PathBuf> = if codeload_url.is_some() {
aube_store::codeload_cache_lookup(&url, &resolved).map(|(dir, _)| dir)
} else {
None
};
if clone_dir.is_none()
&& let (Some(c), Some(url_to_fetch)) = (client, codeload_url.as_deref())
{
match c.fetch_tarball_bytes(url_to_fetch).await {
Ok(bytes) => {
let bytes_vec = bytes.to_vec();
let url_for_extract = url.clone();
let resolved_for_extract = resolved.clone();
match tokio::task::spawn_blocking(move || {
aube_store::extract_codeload_tarball(
&bytes_vec,
&url_for_extract,
&resolved_for_extract,
)
})
.await
.map_err(|e| miette!("codeload extract task panicked: {e}"))?
{
Ok((dir, _sha)) => clone_dir = Some(dir),
Err(e) => {
tracing::debug!(
%spec,
"codeload extract failed, falling back to git clone: {e}",
);
}
}
}
Err(e) => {
tracing::debug!(
%spec,
url = %aube_util::url::redact_url(url_to_fetch),
"codeload fetch failed, falling back to git clone: {e}",
);
}
}
}
let clone_dir = if let Some(dir) = clone_dir {
dir
} else {
let shallow = aube_store::git_host_in_list(&runtime_url, git_shallow_hosts);
let url_for_clone = runtime_url.clone();
let resolved_for_clone = resolved.clone();
let (dir, _head_sha) = tokio::task::spawn_blocking(move || {
aube_store::git_shallow_clone(&url_for_clone, &resolved_for_clone, shallow)
})
.await
.map_err(|e| miette!("git clone task panicked: {e}{chain}"))?
.map_err(|e| miette!("failed to clone {spec}: {e}{chain}"))?;
dir
};
// `&path:/<sub>` narrows the package root to a
// subdirectory of the cloned repo (pnpm-compatible).
// Everything below this line — manifest read, prepare
// scratch copy, archive build, plain directory import —
// operates on the subdir rather than the whole clone.
//
// Defense in depth against a `..`-laden subpath: the
// parser already rejects them, but we also canonicalize
// and assert the result stays under `clone_dir` so a
// future code path that fills `subpath` from a different
// source can't bypass the check.
let pkg_root = match &g.subpath {
Some(sub) => clone_dir.join(sub),
None => clone_dir.clone(),
};
if !pkg_root.is_dir() {
return Err(miette!(
"git dep {spec}: subpath {} not found in clone{chain}",
pkg_root.display()
));
}
if g.subpath.is_some() {
let canonical_clone = clone_dir
.canonicalize()
.into_diagnostic()
.wrap_err_with(|| format!("canonicalize clone dir for {spec}{chain}"))?;
let canonical_pkg = pkg_root
.canonicalize()
.into_diagnostic()
.wrap_err_with(|| format!("canonicalize subpath for {spec}{chain}"))?;
if !canonical_pkg.starts_with(&canonical_clone) {
return Err(miette!(
"git dep {spec}: subpath {} escapes clone root {}{chain}",
canonical_pkg.display(),
canonical_clone.display()
));
}
}
// If the cloned repo defines a `prepare` script, treat
// it as a source checkout that needs to be built before
// we snapshot it. Matches npm/pnpm: a TypeScript repo
// installed from git has devDependencies + a `prepare`
// that compiles `src/` into `dist/`, and consumers
// expect the built output. We run a nested `aube
// install` inside the clone, which installs its deps
// and runs its own root lifecycle hooks (including
// `prepare`), then `aube pack`'s file-selection logic
// snapshots exactly what would be published (honors
// `files`, `.npmignore`, and skips `node_modules`).
//
// `--ignore-scripts` short-circuits the whole branch:
// the only reason we'd pay the cost of a nested install
// is to run `prepare`, so with scripts disabled we fall
// through to the plain directory import. Matches pnpm,
// which skips `prepare` for git deps under
// `--ignore-scripts` as well.
let manifest_path = pkg_root.join("package.json");
let needs_prepare = !ignore_scripts
&& aube_manifest::PackageJson::from_path(&manifest_path)
.ok()
.is_some_and(|pj| pj.scripts.contains_key("prepare"));
if needs_prepare {
// Run `prepare` on a private copy of the checkout,
// not on the shared `git_shallow_clone` cache
// directory. The cache is keyed by (url, commit)
// and reused across installs; mutating it in place
// would leave `node_modules/`, `aube-lock.yaml`,
// and any generated `dist/` behind, so a later
// `aube install --ignore-scripts` — which falls
// through to the plain directory-import path —
// would silently pull those build artifacts into
// the store even though the user asked for a
// scripts-free install. Copying also isolates
// concurrent installs of the same git dep from
// clobbering each other's in-progress prepare.
//
// `ScratchDir` removes the copy on drop, including
// on the error path.
let scratch = prepare_scratch_copy(&pkg_root, &spec)?;
run_git_dep_prepare(
scratch.path(),
&spec,
ignore_scripts,
git_prepare_depth,
inherited_build_policy,
)
.await?;
let archive = crate::commands::pack::build_archive(scratch.path())
.wrap_err_with(|| format!("failed to pack prepared git dep {spec}{chain}"))?;
let index = store
.import_tarball(&archive.tarball)
.map_err(|e| miette!("failed to import prepared {spec}: {e}{chain}"))?;
return Ok(Some(index));
}
let index = store
.import_directory(&pkg_root)
.map_err(|e| miette!("failed to import {}: {e}{chain}", local.specifier()))?;
Ok(Some(index))
}
LocalSource::RemoteTarball(t) => {
// Remote tarball URL: download once, verify the
// resolver-pinned integrity, and import like any other
// .tgz. Reuses the normal tarball importer so the
// linker sees a plain PackageIndex. No store-level
// index cache lookup — the canonical key would need to
// be `(url, integrity)` rather than `(name, version)`
// and remote tarball deps are rare enough that the
// redundant walk isn't worth a new cache namespace.
let client = client.ok_or_else(|| {
miette!(
"internal: import_local_source called without a registry client for {}{chain}",
local.specifier()
)
})?;
let bytes = client
.fetch_tarball_bytes(&t.url)
.await
.map_err(|e| miette!("failed to fetch {}: {e}{chain}", t.url))?;
if t.integrity.is_empty() {
tracing::warn!(
code = aube_codes::warnings::WARN_AUBE_MISSING_INTEGRITY,
url = %aube_util::url::redact_url(&t.url),
"remote tarball lockfile entry has no integrity field; importing fetched bytes without verification (run `aube install --no-frozen-lockfile` to refresh the lockfile)",
);
} else {
aube_store::verify_integrity(&bytes, &t.integrity)
.map_err(|e| miette!("{}: {e}{chain}", aube_util::url::redact_url(&t.url)))?;
}
let index = store
.import_tarball(&bytes)
.map_err(|e| miette!("failed to import {}: {e}{chain}", local.specifier()))?;
Ok(Some(index))
}
}
}
/// Fetch tarballs for resolved packages, checking the index cache first.
/// Used by the lockfile path where all packages are known upfront.
/// Exposed to sibling commands so `aube fetch` can reuse the same
/// parallel-download + integrity-check + index-cache pipeline.
pub(in crate::commands) async fn fetch_packages(
packages: &BTreeMap<String, aube_lockfile::LockedPackage>,
store: &std::sync::Arc<aube_store::Store>,
client: std::sync::Arc<aube_registry::client::RegistryClient>,
progress: Option<&InstallProgress>,
ignore_scripts: bool,
git_prepare_depth: u32,
git_shallow_hosts: Vec<String>,
) -> miette::Result<(BTreeMap<String, aube_store::PackageIndex>, usize, usize)> {
// Eager-client caller (`aube fetch`): the command only exists to
// download tarballs, so there's no point deferring construction.
// `skip_already_linked_shortcut=true` because `aube fetch`'s entire
// job is to verify/populate the global store — it must not be
// short-circuited by a stale `node_modules/.aube/<dep>` from a
// prior install, which could leave the store empty on a setup
// that wipes the global aube store but not `node_modules/` (e.g.
// Docker layer caching, where the store lives in one cached
// layer and `node_modules` in another).
let cwd = crate::dirs::project_root_or_cwd()?;
// `aube fetch` is a thin wrapper whose only job is populating
// the store, so resolve `networkConcurrency` and
// `verifyStoreIntegrity` from the project context here and hand
// them down. Doing the resolve in the wrapper (instead of in
// `aube fetch`'s own entry point) keeps the two call paths
// honest: the lockfile install path and the standalone fetch
// path share the same hardcoded fallback behavior when no
// setting is configured.
let files = crate::commands::FileSources::load(&cwd);
let raw_workspace = aube_manifest::workspace::load_both(&cwd)
.map(|(_, raw)| raw)
.unwrap_or_default();
let env = aube_settings::values::process_env();
let ctx = files.ctx(&raw_workspace, env, &[]);
let network_concurrency = resolve_network_concurrency(&ctx);
let verify_integrity = resolve_verify_store_integrity(&ctx);
let strict_integrity = resolve_strict_store_integrity(&ctx);
let strict_pkg_content_check = resolve_strict_store_pkg_content_check(&ctx);
let virtual_store_dir_max_length = resolve_virtual_store_dir_max_length(&ctx);
let aube_dir = resolve_virtual_store_dir(&ctx, &cwd);
fetch_packages_with_root(
packages,
store,
|| client,
progress,
&cwd,
&aube_dir,
/*materialize_tx=*/ None,
/*skip_already_linked_shortcut=*/ true,
virtual_store_dir_max_length,
ignore_scripts,
network_concurrency,
verify_integrity,
strict_integrity,
strict_pkg_content_check,
git_prepare_depth,
None,
git_shallow_hosts,
)
.await
}
// `network_concurrency`: override for the tarball-fetch semaphore.
// `None` uses the built-in default (128). Surfaced so the
// `networkConcurrency` setting, resolved once at the install-run
// entry point, can cap parallel downloads.
// `verify_integrity`: whether to verify each tarball's SHA-512 against
// its lockfile integrity before importing into the store. `false`
// skips the check entirely; corresponds to `verifyStoreIntegrity=false`.
// `strict_pkg_content_check`: whether to validate that the imported
// tarball's `package.json` advertises the same (name, version) the
// resolver requested. `true` (pnpm default) rejects mismatches before
// linking; corresponds to `strictStorePkgContentCheck=true`.
#[allow(clippy::too_many_arguments)]
pub(super) async fn fetch_packages_with_root<F>(
packages: &BTreeMap<String, aube_lockfile::LockedPackage>,
store: &std::sync::Arc<aube_store::Store>,
client: F,
progress: Option<&InstallProgress>,
project_root: &std::path::Path,
aube_dir: &std::path::Path,
// Some streams every successful (dep_path, index) so a concurrent
// GVS-prewarm materializer can start reflinks before the full
// batch finishes. None keeps batch-then-return for `aube fetch`.
// Sender drops on function exit so consumer sees channel close.
materialize_tx: Option<tokio::sync::mpsc::Sender<(String, aube_store::PackageIndex)>>,
// When true, every package classifies as `Cached` or `NeedsFetch`
// based on `store.load_index`, regardless of whether
// `.aube/<dep>` already exists on disk. Callers pass true when
// either:
//
// - the linker will wipe `node_modules/` before running
// (`link_workspace`), so the `AlreadyLinked` classification
// would be immediately invalidated; or
// - the caller needs `load_index` to actually run as its store
// verification step (`aube fetch`, which treats the act of
// walking the store-file existence check as the operation's
// primary side effect).
//
// Both cases share the same implementation: skip the `.aube/`
// existence check entirely so every package goes through
// `store.load_index` → either `Cached` (store has it) or
// `NeedsFetch` (store is missing the file, download fresh).
skip_already_linked_shortcut: bool,
virtual_store_dir_max_length: usize,
ignore_scripts: bool,
network_concurrency: Option<usize>,
verify_integrity: bool,
strict_integrity: bool,
strict_pkg_content_check: bool,
git_prepare_depth: u32,
inherited_build_policy: Option<std::sync::Arc<aube_scripts::BuildPolicy>>,
git_shallow_hosts: Vec<String>,
) -> miette::Result<(BTreeMap<String, aube_store::PackageIndex>, usize, usize)>
where
F: FnOnce() -> std::sync::Arc<aube_registry::client::RegistryClient>,
{
// No-op fast path: for every package whose per-project
// `node_modules/.aube/<dep_path>` entry already resolves to an
// existing target, skip the package-index load entirely. The
// linker's only consumer of a `PackageIndex` is
// `materialize_into` — if the package is already materialized
// (either as a real directory here in per-project mode, or as a
// symlink into the global virtual store that itself exists),
// there's nothing to materialize and the 13–15 KB JSON on disk at
// `<store>/v1/index/<name>@<ver>.json` would be read for
// nothing. A fresh no-op install against the 1.4k-package medium
// fixture drops from ~38 ms of parallel index reads to a handful
// of `stat(2)`s.
//
// Two call sites disable the fast path entirely via
// `skip_already_linked_shortcut=true`:
//
// - **Workspace installs.** `link_workspace` unconditionally
// wipes `node_modules/` (including `.aube/`) before
// rebuilding, so every `AlreadyLinked` classification would
// be invalidated by the time the linker runs. With the fast
// path enabled, the linker would then fall back to
// `self.store.load_index` *serially* inside `link_workspace`'s
// for-loop, which is strictly slower than loading them here
// in parallel via rayon.
//
// - **`aube fetch`.** The command exists to populate the
// global store (typical use: Docker layer caching, warming
// a CI mirror, or recovering from a wiped aube store).
// If `node_modules/.aube/<dep>` happens to exist from a
// previous install, the `AlreadyLinked` shortcut would skip
// both `load_index` and the tarball fetch — which silently
// leaves the store empty even though the user explicitly
// asked for it to be repopulated. Disabling the shortcut
// makes every package flow through `store.load_index`,
// which does a first-file existence check on the CAS and
// correctly downgrades to `NeedsFetch` when the store entry
// has been wiped.
//
// `Path::exists` follows symlinks, so a per-project entry pointing
// at a global virtual-store target that no longer exists correctly
// falls through to the slow path. The linker re-derives the entry
// name through `aube_dir_entry_name(dep_path)`, which is just
// `dep_path_to_filename(dep_path, max_length)` — we take the max
// length as a parameter (instead of reaching for
// `DEFAULT_VIRTUAL_STORE_DIR_MAX_LENGTH`) so the fast path checks
// the exact same filename the linker will write. The install
// driver (and the `aube fetch` wrapper) both resolve this through
// `super::resolve_virtual_store_dir_max_length(&ctx)` so user
// overrides of `virtualStoreDirMaxLength` flow to both sites and
// we can't produce "the fast path saw `.aube/<X>` but the linker
// expected `.aube/<Y>`" bugs on dep_paths long enough to trigger
// the truncate-and-hash fallback inside `dep_path_to_filename`.
// `aube_dir` is threaded in from
// `commands::resolve_virtual_store_dir` so custom `virtualStoreDir`
// values land on the same path the linker will write to.
enum CheckResult {
/// Already linked into `node_modules/.aube/<dep_path>`. The
/// linker won't need the package index for this dep at all.
AlreadyLinked,
/// Store has the index; linker will reuse it to (re)create any
/// missing symlinks.
Cached(aube_store::PackageIndex),
/// Missing from the store — falls through to the tarball fetch.
NeedsFetch,
}
// Parallel index check (rayon)
let check_results: Vec<_> = packages
.par_iter()
.filter(|(_, pkg)| pkg.local_source.is_none())
.map(|(dep_path, pkg)| {
if !skip_already_linked_shortcut {
let entry_name = dep_path_to_filename(dep_path, virtual_store_dir_max_length);
if aube_dir.join(&entry_name).exists() {
return (dep_path.clone(), pkg, CheckResult::AlreadyLinked);
}
}
// Keyed by registry name so two npm-aliases of the same
// real package share one store index entry instead of
// wastefully double-fetching under the alias. Integrity
// is part of the cache key so a different tarball served
// under the same (name, version) — e.g. a github codeload
// archive vs. the npm-published bytes — can't return the
// wrong file list.
//
// `_verified` because the index cache and the CAS shards
// live in separate paths until the v1/index/ migration
// completes on disk, and external systems can drift them
// apart even after (Docker BuildKit cache mounts that
// only cover one path, foreign sync tools, partial wipes
// mid-install). Stat-per-file is paid only on a cache hit;
// a stale index drops here and falls through to `NeedsFetch`,
// which re-fetches the tarball cleanly — the alternative is
// the materializer dying mid-link with `ERR_AUBE_MISSING_STORE_FILE`,
// forcing the user to retry the whole install.
match store.load_index_verified(
pkg.registry_name(),
&pkg.version,
pkg.integrity.as_deref(),
) {
Some(index) => (dep_path.clone(), pkg, CheckResult::Cached(index)),
None => (dep_path.clone(), pkg, CheckResult::NeedsFetch),
}
})
.collect();
let mut indices: BTreeMap<String, aube_store::PackageIndex> = BTreeMap::new();
// Remote tarball deps need a registry client to download the
// bits during `import_local_source`. Build it eagerly when any
// package has a RemoteTarball source so the local-import loop
// can share a single reqwest client with the fetch branch
// below. Projects without URL tarballs still get the lazy
// construction path in the `to_fetch` branch.
let has_remote_tarball = packages.values().any(|p| {
matches!(
p.local_source,
Some(aube_lockfile::LocalSource::RemoteTarball(_))
)
});
let mut client_slot: Option<std::sync::Arc<aube_registry::client::RegistryClient>> = None;
let mut client_builder = Some(client);
if has_remote_tarball {
client_slot = Some((client_builder.take().unwrap())());
}
// Local (`file:` / `link:`) packages: import directories or
// tarballs straight into the store so the linker has a
// PackageIndex to walk. Link-only deps don't get an index.
for (dep_path, pkg) in packages {
let Some(ref local) = pkg.local_source else {
continue;
};
// Credit every local dep against the overall progress total —
// the total was seeded with `graph.packages.len()`, which
// includes `link:` packages even though they have no
// store-backed index. Skipping the `inc` for `None` would
// stall the bar below 100% for any project with a link dep.
if let Some(index) = import_local_source(
store,
project_root,
local,
client_slot.as_ref(),
ignore_scripts,
git_prepare_depth,
inherited_build_policy.clone(),
&git_shallow_hosts,
&pkg.name,
&pkg.version,
)
.await?
{
indices.insert(dep_path.clone(), index);
}
if let Some(p) = progress {
p.inc_reused(1);
}
}
// Cap by check_results upper bound. Worst case fits in one alloc.
let mut to_fetch = Vec::with_capacity(check_results.len());
let mut cached_count = 0usize;
for (dep_path, pkg, result) in check_results {
match result {
CheckResult::AlreadyLinked => {
// No `indices` entry: the linker takes the
// already-materialized fast path and never touches the
// index map for this dep_path.
cached_count += 1;
}
CheckResult::Cached(index) => {
// Don't stream Cached items through the materializer.
// The link phase fast-paths them via pkg_nm_dir.exists()
// anyway, so the per-pkg spawn pair was pure overhead
// on warm-cache installs (1400-pkg fixture saw +66%
// wall time before the skip).
indices.insert(dep_path, index);
cached_count += 1;
}
CheckResult::NeedsFetch => {
// `registry_name` is the real package name on the
// registry — equal to `name` for the common case, and
// the aliased-real-name for npm-alias entries. The
// tarball URL override is only present for aliased
// entries where `client.tarball_url(&name, ...)` would
// 404 the alias-qualified name; the lockfile reader
// populated it from `resolved:` at parse time.
to_fetch.push((
dep_path,
pkg.name.clone(),
pkg.registry_name().to_string(),
pkg.version.clone(),
pkg.tarball_url.clone(),
pkg.integrity.clone(),
));
}
}
}
// Credit cached packages against the overall counter immediately — only
// the to_fetch set produces visible child rows.
if let Some(p) = progress {
p.inc_reused(cached_count);
}
// Critical-path fetch order: float likely-native-build packages
// (sharp, esbuild, @swc/*, sqlite3, lmdb, bcrypt, etc) to the
// front of the queue. These packages run prebuild/node-gyp at
// install time, and starting their fetch first lets the build
// step pipeline with subsequent fetches instead of blocking on
// the tail. `sort_by_key` is stable so non-native packages keep
// their lockfile-discovery order; only the natives jump ahead.
// `AUBE_DISABLE_CRITICAL_PATH=1` reverts to the previous order
// for byte-identity comparison runs.
if std::env::var_os("AUBE_DISABLE_CRITICAL_PATH").is_none() {
to_fetch
.sort_by_key(|(_, _, registry_name, _, _, _)| !is_likely_native_build(registry_name));
}
let fetch_count = to_fetch.len();
let mut lockfile_persist_handle: Option<(
std::sync::Arc<aube_util::adaptive::PersistentState>,
std::sync::Arc<aube_util::adaptive::AdaptiveLimit>,
)> = None;
if !to_fetch.is_empty() {
// Only build the reqwest+TLS client now that we know we
// actually need to fetch tarballs. On a warm no-op install
// everything classifies as `AlreadyLinked` / `Cached` and this
// closure is never called — the previous eager construction
// cost ~22 ms on Linux just to create a client that never
// sent a single request.
let client = match client_slot.take() {
Some(c) => c,
None => (client_builder.take().unwrap())(),
};
/*
* Adaptive concurrency on the lockfile driven fetch path
* (frozen / fetch / ci / matched lockfile). Same gradient
* controller as the streaming resolver fetch path.
* `networkConcurrency` setting acts as the seed when set.
* Cross run persisted under `tarball:default` so this path
* shares its converged operating point with the streaming
* tarball path.
*/
let sem_seed = network_concurrency.unwrap_or_else(default_lockfile_network_concurrency);
let lockfile_persistent = aube_util::adaptive::global_persistent_state();
let semaphore = match lockfile_persistent.as_ref() {
Some(state) => aube_util::adaptive::AdaptiveLimit::from_persistent(
state,
"tarball:default",
sem_seed.clamp(64, 128),
4,
256,
),
None => aube_util::adaptive::AdaptiveLimit::new(sem_seed.clamp(64, 128), 4, 256),
};
if let Some(state) = lockfile_persistent.clone() {
lockfile_persist_handle = Some((state, std::sync::Arc::clone(&semaphore)));
}
// Hoist env-driven flags out of the per-tarball closure so
// the libc lock fires once instead of N times on a 1000-pkg
// install.
let streaming_sha512_enabled = std::env::var_os("AUBE_DISABLE_STREAMING_SHA512").is_none();
let tarball_stream_enabled = std::env::var_os("AUBE_DISABLE_TARBALL_STREAM").is_none();
// JoinSet so a first-error path aborts the sibling fetches
// instead of detaching them into the background. Detached
// tasks keep writing to the CAS after the install command
// has already errored out.
let mut handles: tokio::task::JoinSet<miette::Result<(String, aube_store::PackageIndex)>> =
tokio::task::JoinSet::new();
for (dep_path, display_name, registry_name, version, tarball_url_override, integrity) in
to_fetch
{
let sem = semaphore.clone();
let store = store.clone();
let client = client.clone();
let row = progress.map(|p| p.start_fetch(&display_name, &version));
let bytes_progress = progress.cloned();
handles.spawn(async move {
let _row = row;
let task_start = std::time::Instant::now();
let permit = sem.acquire().await;
let wait_time = task_start.elapsed();
// Aliased entries (`"h3-v2": "npm:h3@..."`) carry the
// resolved tarball URL verbatim from the lockfile so
// we skip re-deriving it from `registry_name` — the
// lockfile captured the exact URL at write time
// against whatever registry was active then.
let url = tarball_url_override
.clone()
.unwrap_or_else(|| client.tarball_url(®istry_name, &version));
let dl_start = std::time::Instant::now();
// Stream when env enabled and SRI is SHA-512 (or
// absent). Streaming verify can't re-hash with
// another algo, so non-SHA-512 takes the buffered
// path below.
let stream_eligible = tarball_stream_enabled
&& integrity
.as_deref()
.is_none_or(|s| s.starts_with("sha512-"));
if stream_eligible {
let streamed = crate::commands::install::lifecycle::fetch_and_import_tarball_streaming(
&client,
&store,
&url,
&display_name,
®istry_name,
&version,
integrity.as_deref(),
verify_integrity,
strict_integrity,
strict_pkg_content_check,
)
.await;
let (index, bytes_len) = match streamed {
Ok(v) => {
permit.record_success();
v
}
Err(e) => {
if e.is_throttle {
permit.record_throttle();
} else {
permit.record_cancelled();
}
return Err(e.into());
}
};
let dl_time = dl_start.elapsed();
if let Some(p) = bytes_progress.as_ref() {
p.inc_downloaded_bytes(bytes_len);
}
tracing::trace!(
"fetch (stream) {display_name}@{version}: wait={:.0?} total={:.0?} ({} bytes)",
wait_time,
dl_time,
bytes_len
);
return Ok::<_, miette::Report>((dep_path, index));
}
// Buffered SHA-512 path. Streaming SHA-512 hashes
// chunks during the read loop, so import_verified
// skips its hash pass and compares directly.
// AUBE_DISABLE_STREAMING_SHA512=1 reverts to the
// buffered-then-hash path.
let fetch_outcome = if streaming_sha512_enabled {
client
.fetch_tarball_bytes_streaming_sha512(&url)
.await
.map(|(b, d)| (b, Some(d)))
.map_err(|e| {
let throttled = e.is_throttle();
(
miette!(
"failed to fetch {display_name}@{version}: {e}{}",
crate::dep_chain::format_chain_for(&display_name, &version)
),
throttled,
)
})
} else {
client.fetch_tarball_bytes(&url).await.map(|b| (b, None)).map_err(|e| {
let throttled = e.is_throttle();
(
miette!(
"failed to fetch {display_name}@{version}: {e}{}",
crate::dep_chain::format_chain_for(&display_name, &version)
),
throttled,
)
})
};
let (bytes, streamed_digest) = match fetch_outcome {
Ok(v) => {
permit.record_success();
v
}
Err((report, throttled)) => {
if throttled {
permit.record_throttle();
} else {
permit.record_cancelled();
}
return Err(report);
}
};
let dl_time = dl_start.elapsed();
if let Some(p) = bytes_progress.as_ref() {
p.inc_downloaded_bytes(bytes.len() as u64);
}
// Keep the semaphore permit through import, not just
// download. `import_tarball` fans out into gzip/tar
// decode, SHA-512, CAS writes, and index writes; on
// macOS/APFS, letting hundreds of completed downloads
// pile into Tokio's large blocking pool turns the
// cold-cache path into metadata contention. The
// semaphore is therefore the install-wide "download +
// import" pressure valve: enough concurrency to keep
// the network busy, but not enough to swamp the
// filesystem.
//
// Move CPU/blocking work (SHA-512 verify, tar extract,
// file writes, index cache write) onto the blocking
// thread pool so it doesn't starve the async runtime
// workers used for concurrent network I/O.
let bytes_len = bytes.len();
let (index, import_time) = run_import_on_blocking(
store.clone(),
bytes,
streamed_digest,
display_name.clone(),
registry_name.clone(),
version.clone(),
integrity.clone(),
verify_integrity,
strict_integrity,
strict_pkg_content_check,
)
.await?;
tracing::trace!(
"fetch {display_name}@{version}: wait={:.0?} dl={:.0?} ({} bytes) import={:.0?}",
wait_time,
dl_time,
bytes_len,
import_time
);
Ok::<_, miette::Report>((dep_path, index))
});
}
while let Some(joined) = handles.join_next().await {
let (dep_path, index) = joined.into_diagnostic()??;
if let Some(tx) = materialize_tx.as_ref() {
// Time channel send so back-pressure events show up in
// the trace. The materialize channel is bounded; if the
// consumer falls behind, `send().await` blocks until a
// permit frees, which is otherwise invisible in
// `fetch.tarball` totals.
let send_t0 = aube_util::diag::enabled().then(std::time::Instant::now);
tx.send((dep_path.clone(), index.clone()))
.await
.map_err(|_| {
miette!("materializer task exited before fetch_packages finished")
})?;
if let Some(t0) = send_t0 {
let elapsed = t0.elapsed();
if elapsed.as_millis() >= 1 {
aube_util::diag::event(
aube_util::diag::Category::Channel,
"materialize_send_wait",
elapsed,
None,
);
}
}
}
indices.insert(dep_path, index);
}
}
// Without explicit drop, consumer's rx.recv() loop hangs.
drop(materialize_tx);
if let Some((state, sem)) = lockfile_persist_handle {
sem.persist(&state, "tarball:default");
}
Ok((indices, cached_count, fetch_count))
}
/// Pull the canonical version off a dep_path for display purposes. The
/// dep_path looks like `name@1.2.3(peer@x)` — we strip the `name@` prefix
/// and any peer suffix so the warning shows `1.2.3` not `1.2.3(peer@x)`.
pub(super) fn version_from_dep_path(dep_path: &str, name: &str) -> String {
let tail = dep_path
.strip_prefix(&format!("{name}@"))
.unwrap_or(dep_path);
tail.split('(').next().unwrap_or(tail).to_string()
}
/// Re-key a canonical-indexed indices map to match the peer-contextualized
/// dep_paths in `graph`. Each contextualized entry points at the same
/// underlying files as its canonical name@version, so we look each graph
/// entry up by canonical and clone the index — a no-op when canonical ==
/// contextualized (i.e. the package has no peer deps).
pub(super) fn remap_indices_to_contextualized(
canonical_indices: &BTreeMap<String, aube_store::PackageIndex>,
graph: &aube_lockfile::LockfileGraph,
) -> BTreeMap<String, aube_store::PackageIndex> {
let mut out = BTreeMap::new();
for (dep_path, pkg) in &graph.packages {
let canonical_key = pkg.spec_key();
if let Some(idx) = canonical_indices
.get(dep_path)
.or_else(|| canonical_indices.get(&canonical_key))
{
out.insert(dep_path.clone(), idx.clone());
}
}
out
}