mkit_cli/remote_dispatch.rs
1//! URL-scheme → `Transport` dispatch for `mkit push` / `mkit pull`.
2//!
3//! The Rust binary wires all five shipping schemes here: `mkit+file://`,
4//! `mkit+https://` (and `mkit+http://` for local dev), `mkit+s3://`, and
5//! `mkit+ssh://`. The memory transport is in-process only, so it is
6//! reached via [`push_all`] / [`pull_all`] with an `Arc<MemoryTransport>`
7//! constructed in-process rather than URL-based construction. Integration
8//! tests in the `mkit-cli` crate exercise the memory path directly.
9//!
10//! Credentials / environment sources:
11//! - HTTP(S): optional `MKIT_API_TOKEN` bearer.
12//! - S3/R2: `MKIT_R2_ACCESS_KEY_ID` + `MKIT_R2_SECRET_ACCESS_KEY` (plus
13//! optional `MKIT_R2_REGION`, default `auto`). Missing creds do NOT
14//! fail at connect time; the first signed request returns
15//! `TransportError::AccessDenied`.
16//! - SSH: spawns `ssh(1)` subprocess — inherits the user's agent / keys /
17//! `~/.ssh/config`. Per-repo `.mkit/config` SSH options (host-key
18//! checking, known-hosts path, identity file) are wired through via
19//! `SshTransport::connect_with_options` when config is loaded.
20
21use std::path::Path;
22use std::sync::Arc;
23
24use mkit_core::hash::Hash;
25use mkit_core::object::Object;
26use mkit_core::ops::merge::is_ancestor;
27use mkit_core::ops::reachable_objects;
28use mkit_core::ops::restore;
29use mkit_core::pack::{PackError, PackReader};
30use mkit_core::protocol::{PackKey, Transport, TransportError};
31use mkit_core::refs::{self, Head};
32use mkit_core::store::{ObjectStore, StoreError};
33use mkit_transport_file::FileTransport;
34use mkit_transport_http::HttpTransport;
35use mkit_transport_s3::S3Transport;
36use mkit_transport_ssh::{SshInitError, SshTransport};
37
38const DEFAULT_REMOTE: &str = "default";
39
40/// Errors returned by the push / pull helpers. Mapped to exit codes by
41/// the commands themselves.
42#[derive(Debug, thiserror::Error)]
43pub enum DispatchError {
44 #[error("unsupported URL scheme: {0}")]
45 UnsupportedScheme(String),
46 #[error("malformed URL: {0}")]
47 MalformedUrl(String),
48 #[error("no HEAD branch to push")]
49 NoHead,
50 /// A poll-loop checkpoint observed `signal::is_shutdown() == true`
51 /// and aborted partway through. Callers should map this to
52 /// `exit::TEMPFAIL` (75) so retries are safe — the transfer is
53 /// half-finished but the remote is unmodified for any ref we
54 /// hadn't reached yet.
55 #[error("interrupted")]
56 Interrupted,
57 #[error("transport: {0}")]
58 Transport(#[from] TransportError),
59 #[error("refs: {0}")]
60 Refs(#[from] refs::RefError),
61 #[error("repo lock: {0}")]
62 RepoLock(#[from] mkit_core::repo_lock::LockError),
63 #[error("io: {0}")]
64 Io(#[from] std::io::Error),
65 #[error("store: {0}")]
66 Store(#[from] StoreError),
67 #[error("pack: {0}")]
68 Pack(#[from] PackError),
69 #[error("ssh init: {0}")]
70 SshInit(#[from] SshInitError),
71 #[error("pull requires HEAD to point at a branch")]
72 DetachedHead,
73 #[error("remote branch '{0}' not found")]
74 RemoteBranchMissing(String),
75 #[error("pull would not fast-forward branch '{branch}'; merge or rebase first")]
76 NonFastForwardPull { branch: String },
77 #[error("restore safety: {0}")]
78 RestoreSafety(String),
79 #[error("object is not a commit")]
80 NotCommit,
81 #[error("restore: {0}")]
82 Restore(#[from] restore::RestoreError),
83 /// The per-endpoint credential-trust gate (#97) refused to build a
84 /// credential-bearing transport for a repo-chosen endpoint the user
85 /// has not explicitly trusted. The wrapped string is the actionable
86 /// message produced by [`crate::config::endpoint_credential_trust`].
87 #[error("{0}")]
88 UntrustedRemote(String),
89 /// A CAS ref write was rejected because the remote moved under us
90 /// (non-fast-forward). Callers map this to an actionable
91 /// fetch-then-retry / `--force-with-lease` hint.
92 #[error(
93 "updates were rejected for branch '{branch}' (non-fast-forward); fetch and merge first, or re-run with --force-with-lease / --force"
94 )]
95 NonFastForwardPush { branch: String },
96}
97
98/// Open a transport for `endpoint` only after the per-endpoint
99/// credential-trust gate (#97) approves it.
100///
101/// This is the single choke point through which push / fetch / pull
102/// (and named-remote callers in #175) MUST build a transport: it runs
103/// [`crate::config::endpoint_credential_trust`] — keyed on the resolved
104/// ENDPOINT and its `repo_chosen` provenance — *before* constructing
105/// the transport, so a credential-bearing HTTP/S3 transport is never
106/// instantiated for a repo-chosen endpoint the user hasn't trusted.
107///
108/// `repo_chosen` is `true` when the endpoint came from repo-scoped
109/// config (the flat `remote_endpoint` or a `remote.<name>.url`),
110/// `false` when it came from the user / an explicit CLI argument. Trust
111/// is per ENDPOINT, never per remote name.
112pub fn open_trusted(
113 endpoint: &str,
114 repo_chosen: bool,
115 cfg: &crate::config::LayeredConfig,
116) -> Result<Arc<dyn Transport>, DispatchError> {
117 crate::config::endpoint_credential_trust(cfg, endpoint, repo_chosen)
118 .map_err(DispatchError::UntrustedRemote)?;
119 open(endpoint)
120}
121
122/// Open a transport for the given URL. Returns a type-erased `Arc`
123/// so callers can treat all schemes uniformly.
124///
125/// Low-level scheme dispatch only — it does NOT enforce the credential
126/// gate. Production push / fetch / pull paths go through
127/// [`open_trusted`]; `open` stays public for file/memory integration
128/// tests that have no ambient credentials to fence.
129pub fn open(url: &str) -> Result<Arc<dyn Transport>, DispatchError> {
130 if url.starts_with("git+") {
131 return Err(DispatchError::UnsupportedScheme(format!(
132 "'{url}' is a git-bridge remote — native push/pull/fetch/clone do not \
133 speak git transports; use `mkit git export` / `mkit git import` / \
134 `mkit git pull` (feature git-bridge)"
135 )));
136 }
137 if let Some(rest) = url.strip_prefix("mkit+file://") {
138 // mkit+file:///abs/path -> /abs/path
139 let path = Path::new(rest);
140 return Ok(Arc::new(FileTransport::new(path)));
141 }
142 if url.starts_with("mkit+memory://") {
143 // Memory transport is in-process; the URL-based path is not
144 // useful on its own but we accept it so `mkit remote add`
145 // round-trips cleanly.
146 return Err(DispatchError::UnsupportedScheme(
147 "mkit+memory:// must be driven via in-process harness (see tests)".to_string(),
148 ));
149 }
150 if url.starts_with("mkit+https://") || url.starts_with("mkit+http://") {
151 // HttpTransport::connect strips the `mkit+` prefix itself and
152 // reads MKIT_API_TOKEN from the environment.
153 let tx = HttpTransport::connect(url)?;
154 return Ok(Arc::new(tx));
155 }
156 if url.starts_with("mkit+s3://") {
157 // S3Transport::connect reads MKIT_R2_ACCESS_KEY_ID /
158 // MKIT_R2_SECRET_ACCESS_KEY from the environment. Missing
159 // credentials surface as AccessDenied on the first signed call,
160 // not at connect time.
161 let tx = S3Transport::connect(url)?;
162 return Ok(Arc::new(tx));
163 }
164 if url.starts_with("mkit+ssh://") {
165 // SshTransport::connect parses the URL, spawns `ssh(1)`, and
166 // performs the `Hello` / `HelloResponse` handshake. Any failure
167 // here tears the child down before returning, so callers never
168 // see a half-initialised transport.
169 let tx = SshTransport::connect(url)?;
170 return Ok(Arc::new(tx));
171 }
172 #[cfg(feature = "enc-transport")]
173 if url.starts_with("mkit+enc://") {
174 return open_enc(url);
175 }
176 Err(DispatchError::MalformedUrl(url.to_string()))
177}
178
179/// `mkit+enc://` dispatch (Phase 2 of issue #156).
180///
181/// Parses the URL, derives an ephemeral dialer keypair (keystore
182/// integration is SPEC-TRANSPORT-ENC §6 item 5, still deferred), and
183/// runs the encrypted-stream handshake against the URL-advertised
184/// server public key.
185///
186/// Client identity (issue #178): an allowlisting server pins the
187/// dialer's static ed25519 key. To survive across restarts the client
188/// can supply a STABLE raw-32 key file via the `MKIT_ENC_CLIENT_KEY`
189/// environment variable (a user-scoped / CLI-supplied path — never
190/// repo-local `.mkit/config`, which `open_enc` has no access to anyway).
191/// When the variable is unset we fall back to a fresh ephemeral key per
192/// process, which still works against `--unsafe-allow-any-enc-peer`
193/// servers.
194#[cfg(feature = "enc-transport")]
195const ENC_CLIENT_KEY_ENV: &str = "MKIT_ENC_CLIENT_KEY";
196
197#[cfg(feature = "enc-transport")]
198fn open_enc(url: &str) -> Result<Arc<dyn Transport>, DispatchError> {
199 use mkit_transport_enc::url::parse_enc_url;
200
201 let target = parse_enc_url(url).map_err(DispatchError::Transport)?;
202 let sk = load_or_ephemeral_client_key()?;
203 let tx = mkit_transport_enc::connect_tcp(&target.host, target.port, &target.server_pubkey, sk)
204 .map_err(|e| DispatchError::Transport(TransportError::RemoteError(e.to_string())))?;
205 Ok(Arc::new(tx))
206}
207
208/// Resolve the dialer's static signing key.
209///
210/// If `MKIT_ENC_CLIENT_KEY` points at a raw 32-byte key file, load it
211/// (with the standard `load_raw_32` 0600/owner hardening) so the
212/// client's public key is stable — letting an allowlisting server pin
213/// it across restarts. Otherwise draw a fresh ephemeral key from the
214/// system RNG (≥256 bits) for back-compat with allow-any servers.
215#[cfg(feature = "enc-transport")]
216fn load_or_ephemeral_client_key()
217-> Result<commonware_cryptography::ed25519::PrivateKey, DispatchError> {
218 use commonware_codec::DecodeExt as _;
219 use commonware_cryptography::ed25519::PrivateKey;
220 use zeroize::Zeroizing;
221
222 let map_err = |e: String| DispatchError::Transport(TransportError::RemoteError(e));
223
224 if let Some(path) = std::env::var_os(ENC_CLIENT_KEY_ENV).filter(|s| !s.is_empty()) {
225 let seed = mkit_core::sign::load_raw_32(std::path::Path::new(&path))
226 .map_err(|e| map_err(format!("load {ENC_CLIENT_KEY_ENV}: {e}")))?;
227 return PrivateKey::decode(seed.as_ref())
228 .map_err(|e| map_err(format!("client key construction failed: {e}")));
229 }
230
231 // Ephemeral fallback. Draw 32 bytes from `getrandom`, wrapped in
232 // `Zeroizing` so the stack copy is scrubbed on drop; the resulting
233 // `PrivateKey` carries its own `Secret`-based zeroization.
234 let mut secret = Zeroizing::new([0u8; 32]);
235 getrandom::fill(secret.as_mut()).map_err(|e| map_err(e.to_string()))?;
236 PrivateKey::decode(secret.as_ref()).map_err(|e| map_err(e.to_string()))
237}
238
239/// Push every ref under `refs/heads/` to the remote, assembling a pack
240/// of every object reachable from the branch tip that the remote does
241/// not already hold. Returns the count of refs pushed.
242///
243/// Per-ref flow:
244/// 1. Resolve the local branch tip.
245/// 2. Walk reachable objects (`ops::reachable_objects`).
246/// 3. Filter out any object the remote already has via
247/// [`Transport::pack_exists`] (single-object packs, so the digest ==
248/// pack key).
249/// 4. Build a pack with `PackWriter`; the whole-pack digest is the
250/// `PackKey` used by [`Transport::upload_pack`].
251/// 5. Publish the ref with [`Transport::write_ref`].
252pub fn push_all(cwd: &Path, tx: &dyn Transport) -> Result<usize, DispatchError> {
253 push_all_with(cwd, tx, None, false)
254}
255
256/// CAS-aware mirror push (`mkit push --all`). Pushes every local
257/// `refs/heads/*` to the remote, using the remote-tracking ref under
258/// `refs/remotes/<remote>/<branch>` as the CAS lease (Missing when no
259/// tracking ref exists, Match otherwise). `force` upgrades every write
260/// to an unconditional `Any`. On success each pushed branch's
261/// remote-tracking ref is advanced to the pushed tip.
262///
263/// `remote` is the remote NAME used for the local tracking-ref
264/// namespace; `None` means the legacy `default`.
265pub fn push_all_with(
266 cwd: &Path,
267 tx: &dyn Transport,
268 remote: Option<&str>,
269 force: bool,
270) -> Result<usize, DispatchError> {
271 let mkit_dir = cwd.join(mkit_core::MKIT_DIR);
272 let store = crate::commands::open_store_configured(cwd)?;
273 let refs_list = refs::list_refs(&mkit_dir)?;
274 let remote = remote.unwrap_or(DEFAULT_REMOTE);
275 let mut n = 0;
276 for r in refs_list {
277 if crate::signal::is_shutdown() {
278 return Err(DispatchError::Interrupted);
279 }
280 let Some(h) = r.hash else { continue };
281 let condition = if force {
282 refs::RefWriteCondition::Any
283 } else {
284 match refs::read_remote_ref(&mkit_dir, remote, &r.name)? {
285 Some(tracked) => refs::RefWriteCondition::Match(tracked),
286 None => refs::RefWriteCondition::Missing,
287 }
288 };
289 push_branch(tx, &store, &r.name, h, condition)?;
290 refs::write_remote_ref(&mkit_dir, remote, &r.name, &h)?;
291 n += 1;
292 }
293 Ok(n)
294}
295
296/// CAS lease policy for a default (current-branch → upstream) push.
297#[derive(Debug, Clone, Copy)]
298pub enum PushLease {
299 /// Force — unconditional `Any`.
300 Force,
301 /// `--force-with-lease` — require the remote tip to equal the local
302 /// remote-tracking ref (Match), or Missing when there is none.
303 /// Identical mechanism to the default safe push; semantically it is
304 /// the explicit, opt-in form that overwrites a fast-forward-failing
305 /// branch *only* if the remote hasn't moved past what we last saw.
306 WithLease,
307 /// Default safe push: Match the local remote-tracking ref, or
308 /// Missing when absent (first push of this branch).
309 FastForward,
310}
311
312/// Resolve the CAS condition for a single-branch push from the local
313/// remote-tracking ref `refs/remotes/<remote>/<branch>` and the lease
314/// policy.
315pub fn lease_condition(
316 cwd: &Path,
317 remote: &str,
318 branch: &str,
319 lease: PushLease,
320) -> Result<refs::RefWriteCondition, DispatchError> {
321 if matches!(lease, PushLease::Force) {
322 return Ok(refs::RefWriteCondition::Any);
323 }
324 let mkit_dir = cwd.join(mkit_core::MKIT_DIR);
325 Ok(match refs::read_remote_ref(&mkit_dir, remote, branch)? {
326 Some(tracked) => refs::RefWriteCondition::Match(tracked),
327 None => refs::RefWriteCondition::Missing,
328 })
329}
330
331/// Push the current branch to its upstream and, on success, advance the
332/// local remote-tracking ref `refs/remotes/<remote>/<branch>` to the
333/// pushed tip.
334///
335/// `remote` is the upstream remote NAME (for the tracking-ref
336/// namespace); `branch` is the local branch name; `remote_branch` is the
337/// branch name on the remote (`refs/heads/<remote_branch>`).
338pub fn push_branch_tracked(
339 cwd: &Path,
340 tx: &dyn Transport,
341 remote: &str,
342 branch: &str,
343 remote_branch: &str,
344 lease: PushLease,
345) -> Result<Hash, DispatchError> {
346 let mkit_dir = cwd.join(mkit_core::MKIT_DIR);
347 let store = crate::commands::open_store_configured(cwd)?;
348 let tip = refs::read_ref(&mkit_dir, branch)?
349 .ok_or_else(|| DispatchError::RemoteBranchMissing(branch.to_owned()))?;
350 // Default safe push requires a TRUE fast-forward: the new tip must
351 // descend from the last-seen remote-tracking ref. The CAS `Match`
352 // lease alone only proves the remote hasn't moved since we last
353 // fetched — on its own it would still let a divergent local tip
354 // (e.g. after a local `reset` to an unrelated commit) overwrite the
355 // remote, which Git rejects as non-fast-forward. `--force-with-lease`
356 // (`WithLease`) intentionally skips this check (overwrite as long as
357 // the remote matches what we last saw); `Force` skips everything.
358 if matches!(lease, PushLease::FastForward)
359 && let Some(tracked) = refs::read_remote_ref(&mkit_dir, remote, remote_branch)?
360 && !is_ancestor(&store, tracked, tip)?
361 {
362 return Err(DispatchError::NonFastForwardPush {
363 branch: remote_branch.to_owned(),
364 });
365 }
366 let condition = lease_condition(cwd, remote, remote_branch, lease)?;
367 push_branch(tx, &store, remote_branch, tip, condition)?;
368 refs::write_remote_ref(&mkit_dir, remote, remote_branch, &tip)?;
369 Ok(tip)
370}
371
372/// Push one branch: upload every object reachable from `tip` that the
373/// remote lacks, then CAS-write `refs/heads/<branch>` under `condition`.
374///
375/// On a CAS failure ([`TransportError::RefConflict`]) this returns
376/// [`DispatchError::NonFastForwardPush`] so callers can render an
377/// actionable fetch-then-retry hint. Does NOT touch local
378/// remote-tracking refs — the caller decides when to advance them.
379pub fn push_branch(
380 tx: &dyn Transport,
381 store: &ObjectStore,
382 branch: &str,
383 tip: Hash,
384 condition: refs::RefWriteCondition,
385) -> Result<(), DispatchError> {
386 // Walk the reachable set and figure out what the remote lacks.
387 // The current contract with the memory / file transports is one
388 // object per pack, keyed by the object's own digest. This keeps
389 // fetch simple (ask the remote for each hash as it walks the
390 // object graph) and means `pack_exists` is a per-object HEAD
391 // check against the same key we'd upload under.
392 let reachable = reachable_objects(store, &tip)?;
393 for obj in &reachable {
394 if crate::signal::is_shutdown() {
395 return Err(DispatchError::Interrupted);
396 }
397 let key = PackKey::from_hash(*obj);
398 if tx.pack_exists(&key)? {
399 continue;
400 }
401 let bytes = store.read(obj)?;
402 tx.upload_pack(&bytes, &key)?;
403 }
404 // Multi-object pack-level transfer (one pack per ref) is more
405 // efficient but requires the transport contract to advertise
406 // pack keys alongside refs — deferred. Per-object addressing
407 // keeps fetch simple and matches what file.rs / memory.rs
408 // already implement.
409 let full_name = format!("refs/heads/{branch}");
410 match tx.update_ref(&full_name, condition, &tip) {
411 Ok(()) => Ok(()),
412 Err(TransportError::RefConflict) => Err(DispatchError::NonFastForwardPush {
413 branch: branch.to_owned(),
414 }),
415 Err(e) => Err(e.into()),
416 }
417}
418
419/// Fetch remote refs, then fast-forward the current local branch from
420/// `refs/remotes/default/<branch>`. Fresh repos with no local branch tip
421/// initialise from the current branch's remote-tracking ref, or the first
422/// advertised remote branch when the current default branch is absent.
423pub fn pull_all(cwd: &Path, tx: &dyn Transport, remote: &str) -> Result<usize, DispatchError> {
424 let mkit_dir = cwd.join(mkit_core::MKIT_DIR);
425 // ONE repo lock across BOTH phases — the fetch (object write + remote
426 // refs) and the fast-forward (branch ref + HEAD + worktree). Validate
427 // the repo first for a clean non-repo error, and do NOT re-acquire the
428 // lock (it is a non-reentrant file lock): the fetch phase runs via the
429 // non-locking `fetch_objects`, not `fetch_all` (#267).
430 let store = crate::commands::open_store_configured(cwd)?;
431 let _lock = mkit_core::repo_lock::acquire_default(&mkit_dir, "worktree.lock")?;
432 let n = fetch_objects(&store, &mkit_dir, tx, remote)?;
433 let remote_refs = refs::list_remote_refs(&mkit_dir, remote)?
434 .into_iter()
435 .filter_map(|r| r.hash.map(|hash| (r.name, hash)))
436 .collect::<Vec<_>>();
437 if remote_refs.is_empty() {
438 return Ok(n);
439 }
440
441 let original_head = refs::read_head(&mkit_dir).ok();
442 let (branch, local_tip, remote_tip) = match &original_head {
443 Some(Head::Branch(branch)) => {
444 let local_tip = refs::read_ref(&mkit_dir, branch)?;
445 let selected = if local_tip.is_some() {
446 remote_refs
447 .iter()
448 .find(|(name, _)| name == branch)
449 .ok_or_else(|| DispatchError::RemoteBranchMissing(branch.clone()))?
450 } else {
451 remote_refs
452 .iter()
453 .find(|(name, _)| name == branch)
454 .unwrap_or(&remote_refs[0])
455 };
456 (selected.0.clone(), local_tip, selected.1)
457 }
458 Some(Head::Detached(_)) => return Err(DispatchError::DetachedHead),
459 None => (remote_refs[0].0.clone(), None, remote_refs[0].1),
460 };
461
462 let ref_condition = if let Some(local_tip) = local_tip {
463 if local_tip == remote_tip {
464 return Ok(n);
465 }
466 if !is_ancestor(&store, local_tip, remote_tip)? {
467 return Err(DispatchError::NonFastForwardPull { branch });
468 }
469 refs::RefWriteCondition::Match(local_tip)
470 } else {
471 refs::RefWriteCondition::Missing
472 };
473
474 let tree = load_tree_hash(&store, remote_tip)?;
475 crate::commands::ensure_restore_safe(cwd, &store, tree)
476 .map_err(DispatchError::RestoreSafety)?;
477 crate::commands::write_ref_recording_history(&mkit_dir, &branch, ref_condition, &remote_tip)?;
478 if let Err(e) = refs::write_head_branch(&mkit_dir, &branch) {
479 rollback_pull_ref(&mkit_dir, &branch, local_tip, remote_tip)?;
480 return Err(e.into());
481 }
482 if let Err(e) = crate::commands::restore_worktree_and_index(cwd, &store, tree) {
483 if let Err(rollback) =
484 rollback_pull_ref_and_head(&mkit_dir, &branch, local_tip, remote_tip, original_head)
485 {
486 return Err(DispatchError::RestoreSafety(format!(
487 "{e}; additionally failed to roll back ref: {rollback}"
488 )));
489 }
490 return Err(DispatchError::RestoreSafety(e));
491 }
492 Ok(n)
493}
494
495fn rollback_pull_ref_and_head(
496 mkit_dir: &Path,
497 branch: &str,
498 local_tip: Option<Hash>,
499 remote_tip: Hash,
500 original_head: Option<Head>,
501) -> Result<(), String> {
502 rollback_pull_ref(mkit_dir, branch, local_tip, remote_tip).map_err(|e| e.to_string())?;
503 match original_head {
504 Some(Head::Branch(name)) => refs::write_head_branch(mkit_dir, &name),
505 Some(Head::Detached(hash)) => refs::write_head_detached(mkit_dir, &hash),
506 None => Ok(()),
507 }
508 .map_err(|e| e.to_string())
509}
510
511fn rollback_pull_ref(
512 mkit_dir: &Path,
513 branch: &str,
514 local_tip: Option<Hash>,
515 remote_tip: Hash,
516) -> Result<(), refs::RefError> {
517 if let Some(local_tip) = local_tip {
518 crate::commands::write_ref_recording_history(
519 mkit_dir,
520 branch,
521 refs::RefWriteCondition::Match(remote_tip),
522 &local_tip,
523 )
524 } else if refs::read_ref(mkit_dir, branch)? == Some(remote_tip) {
525 refs::delete_ref(mkit_dir, branch)
526 } else {
527 Ok(())
528 }
529}
530
531/// `fetch` — `pull_all` without the HEAD update. Downloads every object
532/// reachable from each remote ref (via [`Transport::download_pack`] on
533/// the object's own digest) and writes the ref into
534/// `refs/remotes/default/<branch>`.
535pub fn fetch_all(cwd: &Path, tx: &dyn Transport, remote: &str) -> Result<usize, DispatchError> {
536 let mkit_dir = cwd.join(mkit_core::MKIT_DIR);
537 // Validate the repo BEFORE locking so a non-repo reports cleanly rather
538 // than as a lock error, then hold the repo lock across the whole
539 // object-write + remote-ref-publish window. This serializes fetch
540 // against `gc` so a concurrent `gc --grace-secs 0` can't prune freshly
541 // downloaded objects before their refs make them reachable (#267).
542 let store = crate::commands::open_store_configured(cwd)?;
543 let _lock = mkit_core::repo_lock::acquire_default(&mkit_dir, "worktree.lock")?;
544 fetch_objects(&store, &mkit_dir, tx, remote)
545}
546
547/// Download every remote `refs/heads/*` object closure and publish the
548/// remote-tracking refs. The caller MUST already hold the repo lock (so the
549/// object writes and ref publication are serialized against `gc`).
550fn fetch_objects(
551 store: &ObjectStore,
552 mkit_dir: &Path,
553 tx: &dyn Transport,
554 remote: &str,
555) -> Result<usize, DispatchError> {
556 let remote_refs = tx.list_refs("refs/heads/")?;
557 let mut n = 0;
558 for r in remote_refs {
559 if crate::signal::is_shutdown() {
560 return Err(DispatchError::Interrupted);
561 }
562 let Some(h) = r.hash else { continue };
563 // Download the pack the remote uploaded for this ref. The
564 // per-object fallback below handles the case where the pack
565 // was never assembled (a push whose reachable set was empty
566 // because the remote already had everything).
567 //
568 // The push path uploads one pack keyed by its own digest; the
569 // memory / file transports `list_refs` only returns the ref,
570 // not the pack digest. So we walk commit→tree→blobs on the
571 // *local* side after downloading, re-using `download_pack` on
572 // each object's hash as a fallback. That matches the
573 // per-object transport semantics in file.rs / memory.rs.
574 fetch_object_closure(store, tx, &h)?;
575 refs::write_remote_ref(mkit_dir, remote, &r.name, &h)?;
576 n += 1;
577 }
578 Ok(n)
579}
580
581fn load_tree_hash(store: &ObjectStore, commit_hash: Hash) -> Result<Hash, DispatchError> {
582 match store.read_object(&commit_hash)? {
583 Object::Commit(c) => Ok(c.tree_hash),
584 Object::Remix(r) => Ok(r.tree_hash),
585 _ => Err(DispatchError::NotCommit),
586 }
587}
588
589/// Recursively download every object reachable from `root` into
590/// `store`, fetching one digest at a time. Used by [`fetch_all`] /
591/// [`pull_all`] when the remote's ref-advertise doesn't carry a
592/// pack key (which is the current contract for the memory + file
593/// transports).
594fn fetch_object_closure(
595 store: &ObjectStore,
596 tx: &dyn Transport,
597 root: &Hash,
598) -> Result<(), DispatchError> {
599 use std::collections::VecDeque;
600
601 let mut queue: VecDeque<Hash> = VecDeque::new();
602 queue.push_back(*root);
603 let mut seen = std::collections::HashSet::new();
604
605 while let Some(h) = queue.pop_front() {
606 if crate::signal::is_shutdown() {
607 return Err(DispatchError::Interrupted);
608 }
609 if !seen.insert(h) {
610 continue;
611 }
612 if store.contains(&h) {
613 // Already local — still walk to be sure children are
614 // present. Read through the store to enqueue them.
615 } else {
616 // Download. Transports that keyed on the object digest
617 // (memory, file) return raw object bytes here.
618 let key = PackKey::from_hash(h);
619 let bytes = match tx.download_pack(&key) {
620 Ok(b) => b,
621 Err(TransportError::PackNotFound) => {
622 // Accept as a no-op: the remote may have assembled
623 // a multi-object pack and thus does not key on the
624 // object digest. The per-object path can't see the
625 // pack in that case; the caller is expected to
626 // either download the pack explicitly (future: a
627 // proper ref-advertise carrying pack keys) or
628 // accept missing objects. For the memory/file
629 // transports the per-object mapping is always
630 // populated by the push side, so this branch is
631 // defensive.
632 continue;
633 }
634 Err(e) => return Err(e.into()),
635 };
636 // If the remote returned a real packfile, unpack it. Else,
637 // treat the bytes as a single raw object.
638 if bytes.starts_with(mkit_core::pack::MAGIC) {
639 let _ = PackReader::read(&bytes, store)?;
640 } else {
641 let stored = store.write(&bytes)?;
642 // Sanity check: the digest MUST match the key we asked
643 // for — otherwise the remote is serving mismatched
644 // content.
645 if stored != h {
646 return Err(DispatchError::Transport(TransportError::InvalidResponse));
647 }
648 }
649 }
650 // Enqueue children so we walk the whole closure.
651 if let Ok(obj) = store.read_object(&h) {
652 use mkit_core::object::Object;
653 match obj {
654 Object::Commit(c) => {
655 queue.push_back(c.tree_hash);
656 for p in c.parents {
657 queue.push_back(p);
658 }
659 }
660 Object::Remix(r) => {
661 queue.push_back(r.tree_hash);
662 for p in r.parents {
663 queue.push_back(p);
664 }
665 }
666 Object::Tree(t) => {
667 for e in t.entries {
668 queue.push_back(e.object_hash);
669 }
670 }
671 Object::ChunkedBlob(cb) => {
672 for c in cb.chunks {
673 queue.push_back(c);
674 }
675 }
676 Object::Tag(t) => {
677 queue.push_back(t.target);
678 }
679 Object::Blob(_) | Object::Delta(_) => {}
680 }
681 }
682 }
683 Ok(())
684}