Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    io::{self, Seek, SeekFrom, Write},
4    sync::{
5        Arc, Mutex,
6        atomic::{AtomicUsize, Ordering},
7    },
8    time::{Duration, Instant},
9};
10
11use tempfile::NamedTempFile;
12
13use grpc::heddle::v1::{
14    GetBlobRequest, GitCheckpointTransfer, GitLaneTransfer, GitPackTransfer,
15    GitRefKind as GrpcGitRefKind,
16    GitRefUpdateTransfer, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
17    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
18    RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
19    ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
20    git_lane_transfer, pull_message, push_message,
21};
22use objects::{
23    Progress,
24    object::{ChangeId, ContentHash, MarkerName, ThreadName},
25    store::{AnyStore, ObjectStore, PackObjectId},
26};
27use repo::{
28    GitRefKind as ClassifiedGitRefKind, GitRefName, Repository, RepositoryCapability,
29    RevisionAddress, SyncedThreadMetadata,
30    ThreadManager,
31};
32use sley::{
33    ObjectId as GitObjectId, RefPrecondition, ReferenceTarget, Repository as SleyRepository,
34};
35use tokio::sync::mpsc;
36use tokio_stream::wrappers::ReceiverStream;
37use tonic::Request;
38use wire::{
39    GitLaneTransferIntent, ObjectInfo, ObjectType, ObjectTypeBucket, PlannedObject, ProtocolError,
40    PullComplete, PushComplete, RefEntry, RefUpdated, RepositoryTransferPlan,
41};
42
43use super::{
44    HostedGrpcClient, PullMaterialization,
45    helpers::{
46        descriptor_id, descriptor_id_from_info, object_descriptor_with_status, object_type_name,
47        parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
48        transport_mode_name,
49    },
50};
51
52#[derive(Clone, Copy)]
53struct PullOptions<'a> {
54    local_thread: Option<&'a str>,
55    depth: Option<u32>,
56    target_state: Option<ChangeId>,
57    materialization: PullMaterialization,
58}
59
60struct PullWantPlan {
61    wants: Vec<ObjectDescriptor>,
62    transfer_plan: RepositoryTransferPlan<ObjectInfo>,
63    wanted_types: WantedTypes,
64    want_full_closure: bool,
65}
66
67type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
68
69struct GitLanePushPlan {
70    local_revision_address: String,
71    pack: GitPackPushPlan,
72    /// The N ref updates streamed after the single multi-root pack, one per
73    /// direct git-overlay ref (Branch/Tag/Note/Other). Every entry carries
74    /// `checkpoint: None` — the discriminator the weft server uses to admit
75    /// checkpoint-less multi-ref pushes. Per-ref compare-and-set expectations
76    /// are pre-applied from the server `ListRefs` response.
77    ref_updates: Vec<PushMessage>,
78}
79
80#[derive(Clone)]
81struct GitPackPushPlan {
82    transfer_id: String,
83    pack_id: Vec<u8>,
84    pack_size: u64,
85    /// Root oids the pack is built from. The native path has exactly one
86    /// root (the checkpoint commit); the mirror path has one per resolved
87    /// ref target. The reachable-pack plan packs the transitive closure of
88    /// every root into a single pack.
89    #[cfg_attr(not(test), allow(dead_code))]
90    roots: Vec<GitObjectId>,
91    /// Reachable pack bytes written once during planning; streamed on the wire
92    /// without a second ODB traversal. Auto-deleted when the last clone drops.
93    pack_file: Arc<Mutex<NamedTempFile>>,
94}
95
96const PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD: usize = 512;
97const PULL_PACK_SPOOL_OBJECT_THRESHOLD: usize = 512;
98const NATIVE_PACK_DRAIN_OBJECT_INTERVAL: usize = 32;
99const NATIVE_PACK_OBJECT_PREFETCH_LIMIT: usize = 32;
100const NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT: usize = 8;
101
102#[derive(Debug, Clone, Default)]
103pub struct PullObjectMix {
104    pub blobs: usize,
105    pub trees: usize,
106    pub states: usize,
107    pub actions: usize,
108    pub redactions: usize,
109    pub state_visibilities: usize,
110}
111
112#[derive(Debug, Clone)]
113pub struct HostedRefEntry {
114    pub name: String,
115    pub change_id: ChangeId,
116    pub is_thread: bool,
117    pub revision_address: String,
118}
119
120impl PullObjectMix {
121    fn record(&mut self, obj_type: ObjectType) {
122        match obj_type.bucket() {
123            ObjectTypeBucket::Blob => self.blobs += 1,
124            ObjectTypeBucket::Tree => self.trees += 1,
125            ObjectTypeBucket::State => self.states += 1,
126            ObjectTypeBucket::Action => self.actions += 1,
127            ObjectTypeBucket::Redaction => self.redactions += 1,
128            ObjectTypeBucket::StateVisibility => self.state_visibilities += 1,
129        }
130    }
131
132    pub fn total(&self) -> usize {
133        self.blobs
134            + self.trees
135            + self.states
136            + self.actions
137            + self.redactions
138            + self.state_visibilities
139    }
140}
141
142#[derive(Debug, Clone, Default)]
143pub struct PullProfile {
144    pub ready_wait: Duration,
145    pub receive_and_apply: Duration,
146    pub decode: Duration,
147    pub store_receive_object: Duration,
148    pub metadata_sync: Duration,
149    pub pack_decode_apply: Duration,
150    pub raw_decode_apply: Duration,
151    pub pack_decode: Duration,
152    pub raw_decode: Duration,
153    pub bytes_received: usize,
154    pub pack_bytes_received: usize,
155    pub raw_bytes_received: usize,
156    pub objects_received: usize,
157    pub object_mix: PullObjectMix,
158}
159
160impl HostedGrpcClient {
161    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
162        Ok(self
163            .list_refs_with_revision_addresses(repo_path)
164            .await?
165            .into_iter()
166            .map(|entry| RefEntry {
167                name: entry.name,
168                change_id: entry.change_id,
169                is_thread: entry.is_thread,
170            })
171            .collect())
172    }
173
174    pub async fn list_refs_with_revision_addresses(
175        &mut self,
176        repo_path: &str,
177    ) -> Result<Vec<HostedRefEntry>, ProtocolError> {
178        let mut request = Request::new(ListRefsRequest {
179            repo_path: repo_path.to_string(),
180        });
181        self.apply_signed_auth(&mut request, "/heddle.v1.RepoSyncService/ListRefs")?;
182        let response = self
183            .inner
184            .list_refs(request)
185            .await
186            .map_err(status_to_protocol_error)?
187            .into_inner();
188        response
189            .refs
190            .into_iter()
191            .map(|entry| {
192                Ok(HostedRefEntry {
193                    name: entry.name,
194                    change_id: ChangeId::try_from_slice(&entry.change_id)
195                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
196                    is_thread: entry.is_thread,
197                    revision_address: entry.revision_address,
198                })
199            })
200            .collect()
201    }
202
203    #[allow(clippy::too_many_arguments)]
204    pub async fn update_ref(
205        &mut self,
206        repo_path: &str,
207        name: &str,
208        is_thread: bool,
209        old_value: Option<ChangeId>,
210        new_value: ChangeId,
211        force: bool,
212        thread_metadata: Option<&SyncedThreadMetadata>,
213    ) -> Result<RefUpdated, ProtocolError> {
214        let mut request = Request::new(UpdateRefRequest {
215            repo_path: repo_path.to_string(),
216            name: name.to_string(),
217            is_thread,
218            force,
219            old_value: old_value
220                .map(|value| value.to_string_full())
221                .unwrap_or_default(),
222            new_value: new_value.to_string_full(),
223            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
224            old_revision_address: old_value
225                .map(|value| RevisionAddress::heddle(value).to_string())
226                .unwrap_or_default(),
227            new_revision_address: RevisionAddress::heddle(new_value).to_string(),
228            client_operation_id: String::new(),
229        });
230        self.apply_signed_auth(&mut request, "/heddle.v1.RepoSyncService/UpdateRef")?;
231        let response = self
232            .inner
233            .update_ref(request)
234            .await
235            .map_err(status_to_protocol_error)?
236            .into_inner();
237        Ok(RefUpdated {
238            success: response.success,
239            old_value: if response.old_value.is_empty() {
240                None
241            } else {
242                Some(
243                    ChangeId::parse(&response.old_value)
244                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
245                )
246            },
247            error: (!response.error.is_empty()).then_some(response.error),
248        })
249    }
250
251    pub async fn push(
252        &mut self,
253        repo: &Repository,
254        repo_path: &str,
255        local_state: ChangeId,
256        target_thread: &str,
257        force: bool,
258    ) -> Result<PushComplete, ProtocolError> {
259        self.push_with_revision(
260            repo,
261            repo_path,
262            local_state,
263            target_thread,
264            force,
265            RevisionAddress::heddle(local_state).to_string(),
266            None,
267            &Progress::null(),
268        )
269        .await
270    }
271
272    /// Push ALL git-overlay refs (every branch, tag, note, and other ref)
273    /// in one shot: a single multi-root pack followed by N checkpoint-less
274    /// ref updates (git-mirror mode). This is the DEFAULT hosted git-overlay
275    /// push path (#846) — the git format is shipped straight through weft's
276    /// git lane with no native conversion. Native heddle conversion stays
277    /// opt-in via `heddle adopt`.
278    ///
279    /// Per-ref remote expectations are read from the server `ListRefs`
280    /// response so each ref update carries the compare-and-set precondition
281    /// the server currently holds.
282    ///
283    /// `progress` drives the live push line (packing → uploading bytes →
284    /// writing N refs); pass [`Progress::null`] for machine-readable / non-TTY
285    /// callers.
286    pub async fn push_git_overlay_mirror(
287        &mut self,
288        repo: &Repository,
289        repo_path: &str,
290        local_state: ChangeId,
291        target_thread: &str,
292        force: bool,
293        progress: &Progress,
294    ) -> Result<PushComplete, ProtocolError> {
295        progress.set_phase("packing refs");
296        let remote_ref_expectations = self.git_mirror_ref_expectations(repo_path).await?;
297        let git_lane = build_git_mirror_push_plan(
298            repo,
299            self.transport.chunk_size.max(1),
300            &remote_ref_expectations,
301        )?;
302        let local_revision_address = git_lane.local_revision_address.clone();
303        self.push_with_revision(
304            repo,
305            repo_path,
306            local_state,
307            target_thread,
308            force,
309            local_revision_address,
310            Some(git_lane),
311            progress,
312        )
313        .await
314    }
315
316    /// Fetch the server's current ref → git-revision-address map so the
317    /// mirror plan can attach per-ref compare-and-set expectations. Refs the
318    /// server does not know about are treated as expected-missing (create).
319    async fn git_mirror_ref_expectations(
320        &mut self,
321        repo_path: &str,
322    ) -> Result<HashMap<String, GitRefRemoteExpectation>, ProtocolError> {
323        let remote_refs = self.list_refs_with_revision_addresses(repo_path).await?;
324        let mut expectations = HashMap::with_capacity(remote_refs.len());
325        for entry in remote_refs {
326            let expectation = parse_git_ref_expectation(&entry.revision_address)?;
327            expectations.insert(entry.name, expectation);
328        }
329        Ok(expectations)
330    }
331
332    #[allow(clippy::too_many_arguments)]
333    async fn push_with_revision(
334        &mut self,
335        repo: &Repository,
336        repo_path: &str,
337        local_state: ChangeId,
338        target_thread: &str,
339        force: bool,
340        local_revision_address: String,
341        git_lane: Option<GitLanePushPlan>,
342        progress: &Progress,
343    ) -> Result<PushComplete, ProtocolError> {
344        let _ = self.transport.chunk_size;
345        let _ = self.transport.resume_attempts;
346        // TODO: Gate hosted Git-lane transfer planning on the Sley reachable-pack
347        // facade. Keep this as ExistingImplementation until Sley can return the
348        // exact pack identity and stream from one boundary; do not add a
349        // Heddle-local reachable-pack planner or wire variant here.
350        let git_lane_intent = if git_lane.is_some() {
351            GitLaneTransferIntent::ExistingImplementation
352        } else {
353            GitLaneTransferIntent::HeddleObjectsOnly
354        };
355        let closure = wire::enumerate_state_closure_transfer_with_options(
356            repo.store(),
357            local_state,
358            wire::StateClosureOptions::default(),
359            PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD,
360        )?;
361        let object_plan =
362            RepositoryTransferPlan::from_planned_objects(closure.planned_objects, git_lane_intent);
363        let full_objects = closure.full_objects;
364        let object_count = full_objects
365            .as_ref()
366            .map_or(object_plan.stats.total_objects, std::vec::Vec::len);
367        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
368        let transport_mode = preferred_transport_mode(&self.transport, object_count);
369        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
370        let request_message = PushMessage {
371            body: Some(push_message::Body::Request(PushRequest {
372                repo_path: repo_path.to_string(),
373                local_state: local_state.to_string_full(),
374                target_thread: target_thread.to_string(),
375                create_thread: true,
376                force,
377                objects: full_objects.as_ref().map_or_else(
378                    || {
379                        object_plan
380                            .partitions
381                            .iter()
382                            .map(to_proto_planned_object)
383                            .collect()
384                    },
385                    |objects| objects.iter().map(to_proto_object_info).collect(),
386                ),
387                transfer: Some(self.transport.transfer_checkpoint_with_mode(
388                    transfer_id.clone(),
389                    transport_mode,
390                    0,
391                    0,
392                    false,
393                )),
394                partial_fetch_status: partial_fetch_status_for_repo(repo),
395                allow_partial_fetch: true,
396                thread_metadata: thread_metadata
397                    .map(|metadata| to_proto_thread_metadata(&metadata)),
398                local_revision_address,
399                client_operation_id: String::new(),
400            })),
401        };
402
403        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
404        tx.send(request_message).await.map_err(|_| {
405            ProtocolError::InvalidState("failed to initialize push stream".to_string())
406        })?;
407        let mut request = Request::new(ReceiverStream::new(rx));
408        self.apply_auth(&mut request)?;
409        let mut response = self
410            .inner
411            .push(request)
412            .await
413            .map_err(status_to_protocol_error)?
414            .into_inner();
415
416        let ready = match response.message().await.map_err(status_to_protocol_error)? {
417            Some(PushMessage {
418                body: Some(push_message::Body::Ready(ready)),
419            }) => ready,
420            _ => {
421                return Err(ProtocolError::InvalidState(
422                    "expected PushReady from gRPC server".to_string(),
423                ));
424            }
425        };
426        let object_index = match full_objects {
427            Some(objects) => objects
428                .into_iter()
429                .map(|info| (descriptor_id_from_info(&info), info))
430                .collect::<HashMap<_, _>>(),
431            None => object_plan
432                .partitions
433                .iter()
434                .map(|object| {
435                    (
436                        descriptor_id_from_plan(object),
437                        object_info_from_plan(object),
438                    )
439                })
440                .collect::<HashMap<_, _>>(),
441        };
442
443        let ready_transport_mode = ready
444            .transfer
445            .as_ref()
446            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
447            .unwrap_or(transport_mode);
448        let wanted_infos = ready
449            .want_objects
450            .into_iter()
451            .map(|want| {
452                object_index
453                    .get(&descriptor_id(&want))
454                    .cloned()
455                    .ok_or_else(|| {
456                        ProtocolError::InvalidState("server requested unknown object".to_string())
457                    })
458            })
459            .collect::<Result<Vec<_>, _>>()?;
460
461        let wanted_plan = RepositoryTransferPlan::from_object_infos(wanted_infos, git_lane_intent);
462
463        if !wanted_plan.partitions.packable_objects.is_empty() {
464            send_native_pack_streaming_messages(
465                &tx,
466                repo,
467                &wanted_plan.partitions.packable_objects,
468                &transfer_id,
469                self.transport.chunk_size.max(1),
470                &self.transport,
471                ready_transport_mode,
472            )
473            .await?;
474        }
475
476        for info in wanted_plan.partitions.sidecar_objects {
477            let message = sidecar_push_message(repo, info)?;
478            tx.send(message).await.map_err(|_| {
479                ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
480            })?;
481        }
482
483        if let Some(git_lane) = git_lane {
484            // One multi-root pack (live "uploading" progress), then N
485            // checkpoint-less ref updates (git-mirror mode).
486            send_git_pack_streaming_messages(
487                &tx,
488                &git_lane.pack,
489                self.transport.chunk_size.max(1),
490                progress,
491            )
492            .await?;
493            progress.set_phase(format!("writing {} refs", git_lane.ref_updates.len()));
494            for ref_update in git_lane.ref_updates {
495                tx.send(ref_update).await.map_err(|_| {
496                    ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
497                })?;
498            }
499        }
500        drop(tx);
501
502        let result = match response.message().await.map_err(status_to_protocol_error)? {
503            Some(PushMessage {
504                body: Some(push_message::Body::Complete(complete)),
505            }) => PushComplete {
506                success: complete.success,
507                new_state: if complete.new_state.is_empty() {
508                    None
509                } else {
510                    Some(
511                        ChangeId::parse(&complete.new_state)
512                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
513                    )
514                },
515                error: (!complete.error.is_empty()).then_some(complete.error),
516                transfer_id: complete
517                    .transfer
518                    .as_ref()
519                    .map(|transfer| transfer.transfer_id.clone())
520                    .unwrap_or_default(),
521                transport_mode: complete
522                    .transfer
523                    .as_ref()
524                    .map(|transfer| transport_mode_name(transfer.transport_mode))
525                    .unwrap_or("raw-objects")
526                    .to_string(),
527                resume_offset: complete
528                    .transfer
529                    .as_ref()
530                    .map(|transfer| transfer.resume_offset)
531                    .unwrap_or_default(),
532                chunk_index: complete
533                    .transfer
534                    .as_ref()
535                    .map(|transfer| transfer.chunk_index)
536                    .unwrap_or_default(),
537                checkpoint: complete
538                    .transfer
539                    .as_ref()
540                    .map(|transfer| transfer.checkpoint.clone())
541                    .unwrap_or_default(),
542                is_complete: complete
543                    .transfer
544                    .as_ref()
545                    .map(|transfer| transfer.is_complete)
546                    .unwrap_or(false),
547            },
548            _ => {
549                return Err(ProtocolError::InvalidState(
550                    "expected PushComplete from gRPC server".to_string(),
551                ));
552            }
553        };
554
555        if result.success {
556            self.sync_remote_markers(repo, repo_path, local_state)
557                .await?;
558        }
559        Ok(result)
560    }
561
562    pub async fn pull(
563        &mut self,
564        repo: &Repository,
565        repo_path: &str,
566        remote_thread: &str,
567        local_thread: Option<&str>,
568    ) -> Result<PullComplete, ProtocolError> {
569        self.pull_with_options(
570            repo,
571            repo_path,
572            remote_thread,
573            PullOptions {
574                local_thread,
575                depth: None,
576                target_state: None,
577                materialization: PullMaterialization::Full,
578            },
579        )
580        .await
581    }
582
583    pub async fn pull_profiled(
584        &mut self,
585        repo: &Repository,
586        repo_path: &str,
587        remote_thread: &str,
588        local_thread: Option<&str>,
589    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
590        self.pull_exchange(
591            repo,
592            repo_path,
593            remote_thread,
594            PullOptions {
595                local_thread,
596                depth: None,
597                target_state: None,
598                materialization: PullMaterialization::Full,
599            },
600        )
601        .await
602        .map(|exchange| (exchange.result, exchange.profile))
603    }
604
605    pub async fn pull_partial(
606        &mut self,
607        repo: &Repository,
608        repo_path: &str,
609        remote_thread: &str,
610        local_thread: Option<&str>,
611    ) -> Result<PullComplete, ProtocolError> {
612        self.pull_with_options(
613            repo,
614            repo_path,
615            remote_thread,
616            PullOptions {
617                local_thread,
618                depth: None,
619                target_state: None,
620                materialization: PullMaterialization::Lazy,
621            },
622        )
623        .await
624    }
625
626    pub async fn pull_with_depth_and_materialization(
627        &mut self,
628        repo: &Repository,
629        repo_path: &str,
630        remote_thread: &str,
631        local_thread: Option<&str>,
632        depth: Option<u32>,
633        materialization: PullMaterialization,
634    ) -> Result<PullComplete, ProtocolError> {
635        self.pull_with_options(
636            repo,
637            repo_path,
638            remote_thread,
639            PullOptions {
640                local_thread,
641                depth,
642                target_state: None,
643                materialization,
644            },
645        )
646        .await
647    }
648
649    pub async fn pull_with_depth(
650        &mut self,
651        repo: &Repository,
652        repo_path: &str,
653        remote_thread: &str,
654        local_thread: Option<&str>,
655        depth: Option<u32>,
656    ) -> Result<PullComplete, ProtocolError> {
657        self.pull_with_depth_and_materialization(
658            repo,
659            repo_path,
660            remote_thread,
661            local_thread,
662            depth,
663            PullMaterialization::Full,
664        )
665        .await
666    }
667
668    pub async fn fetch_state(
669        &mut self,
670        repo: &Repository,
671        repo_path: &str,
672        remote_thread: &str,
673        target_state: ChangeId,
674    ) -> Result<usize, ProtocolError> {
675        self.pull_exchange(
676            repo,
677            repo_path,
678            remote_thread,
679            PullOptions {
680                local_thread: None,
681                depth: None,
682                target_state: Some(target_state),
683                materialization: PullMaterialization::Full,
684            },
685        )
686        .await
687        .map(|exchange| exchange.object_count)
688    }
689
690    pub async fn fetch_state_partial(
691        &mut self,
692        repo: &Repository,
693        repo_path: &str,
694        remote_thread: &str,
695        target_state: ChangeId,
696    ) -> Result<usize, ProtocolError> {
697        self.pull_exchange(
698            repo,
699            repo_path,
700            remote_thread,
701            PullOptions {
702                local_thread: None,
703                depth: None,
704                target_state: Some(target_state),
705                materialization: PullMaterialization::Lazy,
706            },
707        )
708        .await
709        .map(|exchange| exchange.object_count)
710    }
711
712    pub async fn hydrate_blob_at_path(
713        &mut self,
714        repo: &Repository,
715        repo_path: &str,
716        reference: &str,
717        path: &str,
718    ) -> Result<objects::object::Blob, ProtocolError> {
719        let mut request = Request::new(GetBlobRequest {
720            repo_path: repo_path.to_string(),
721            r#ref: reference.to_string(),
722            path: path.to_string(),
723        });
724        self.apply_signed_auth(&mut request, "/heddle.v1.ContentService/GetBlob")?;
725        let response = self
726            .content
727            .get_blob(request)
728            .await
729            .map_err(status_to_protocol_error)?
730            .into_inner();
731
732        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
733        let blob = objects::object::Blob::new(content);
734        repo.store().put_blob(&blob)?;
735        repo.clear_missing_blob(&blob.hash())?;
736        Ok(blob)
737    }
738
739    pub async fn hydrate_missing_blobs_for_state(
740        &mut self,
741        repo: &Repository,
742        repo_path: &str,
743        remote_thread: &str,
744        target_state: ChangeId,
745    ) -> Result<usize, ProtocolError> {
746        let exchange = self
747            .pull_exchange(
748                repo,
749                repo_path,
750                remote_thread,
751                PullOptions {
752                    local_thread: None,
753                    depth: None,
754                    target_state: Some(target_state),
755                    materialization: PullMaterialization::Full,
756                },
757            )
758            .await?;
759        clear_missing_blobs_for_state(repo, target_state)?;
760        Ok(exchange.object_count)
761    }
762
763    async fn pull_with_options(
764        &mut self,
765        repo: &Repository,
766        repo_path: &str,
767        remote_thread: &str,
768        options: PullOptions<'_>,
769    ) -> Result<PullComplete, ProtocolError> {
770        self.pull_exchange(repo, repo_path, remote_thread, options)
771            .await
772            .map(|exchange| exchange.result)
773    }
774
775    async fn pull_exchange(
776        &mut self,
777        repo: &Repository,
778        repo_path: &str,
779        remote_thread: &str,
780        options: PullOptions<'_>,
781    ) -> Result<PullExchange, ProtocolError> {
782        let exchange_start = Instant::now();
783        let mut exclude_states = Vec::new();
784        // Whether the head comes from an explicit `--local-thread` or is
785        // inferred from the bare remote thread, it is advertised as
786        // `exclude_states` ONLY when its full object closure is provably
787        // present locally. The server trusts `exclude_states` blindly and
788        // prunes the advertised closure — so advertising a head whose closure
789        // we lack (a partial/lazy clone, an interrupted prior pull) would make
790        // the server omit those objects and silently leave us with an
791        // incomplete repo. Both branches therefore share the same completeness
792        // gate; when it refuses, we fall back to the correct (slower)
793        // empty-exclude full pull.
794        let advertised_head = if let Some(local_thread) = options.local_thread {
795            locally_complete_local_thread_head(repo, local_thread, options.target_state)?
796        } else {
797            locally_complete_pull_head(repo, remote_thread, options.target_state)?
798        };
799        if let Some(head) = advertised_head {
800            exclude_states.push(head);
801        }
802        let allow_partial_fetch = options.materialization.allows_partial_fetch();
803        let fresh_full_pull =
804            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
805
806        let transfer_id = pull_transfer_id(
807            repo_path,
808            remote_thread,
809            options.local_thread,
810            options.depth,
811            options.target_state,
812        );
813        let request_message = PullMessage {
814            body: Some(pull_message::Body::Request(PullRequest {
815                repo_path: repo_path.to_string(),
816                remote_thread: remote_thread.to_string(),
817                local_thread: options.local_thread.unwrap_or_default().to_string(),
818                target_state: options
819                    .target_state
820                    .map(|value| value.to_string_full())
821                    .unwrap_or_default(),
822                depth: options.depth.unwrap_or_default(),
823                exclude_states: exclude_states
824                    .iter()
825                    .map(ChangeId::to_string_full)
826                    .collect(),
827                transfer: Some(self.transport.transfer_checkpoint_with_mode(
828                    transfer_id.clone(),
829                    TransportMode::NativePack,
830                    0,
831                    0,
832                    false,
833                )),
834                partial_fetch_status: partial_fetch_status_for_repo(repo),
835                allow_partial_fetch,
836                fresh_full_pull,
837                target_revision_address: options
838                    .target_state
839                    .map(|state| RevisionAddress::heddle(state).to_string())
840                    .unwrap_or_default(),
841                client_operation_id: String::new(),
842            })),
843        };
844
845        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
846        tx.send(request_message).await.map_err(|_| {
847            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
848        })?;
849        let mut request = Request::new(ReceiverStream::new(rx));
850        self.apply_auth(&mut request)?;
851        let mut response = self
852            .inner
853            .pull(request)
854            .await
855            .map_err(status_to_protocol_error)?
856            .into_inner();
857
858        let ready = match response.message().await.map_err(status_to_protocol_error)? {
859            Some(PullMessage {
860                body: Some(pull_message::Body::Ready(ready)),
861            }) => ready,
862            _ => {
863                return Err(ProtocolError::InvalidState(
864                    "expected PullReady from gRPC server".to_string(),
865                ));
866            }
867        };
868        let mut profile = PullProfile {
869            ready_wait: exchange_start.elapsed(),
870            ..PullProfile::default()
871        };
872        let remote_state = ChangeId::parse(&ready.remote_state)
873            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
874        let advertised_object_count = ready.objects_to_fetch.len();
875        let PullWantPlan {
876            wants,
877            transfer_plan,
878            wanted_types,
879            want_full_closure,
880        } = plan_pull_wants(
881            repo,
882            &remote_state,
883            ready.full_closure_available,
884            ready.objects_to_fetch,
885            allow_partial_fetch,
886        )?;
887        let native_pack_required = native_pack_required_for_pull(want_full_closure, &transfer_plan);
888
889        tx.send(PullMessage {
890            body: Some(pull_message::Body::Want(WantObjects {
891                objects: wants.clone(),
892                want_full_closure,
893                transfer: Some(self.transport.transfer_checkpoint_with_mode(
894                    transfer_id.clone(),
895                    TransportMode::NativePack,
896                    0,
897                    0,
898                    false,
899                )),
900            })),
901        })
902        .await
903        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
904        drop(tx);
905
906        let receive_start = Instant::now();
907        let use_pack_spool = advertised_object_count > PULL_PACK_SPOOL_OBJECT_THRESHOLD;
908        let mut pack_state = wire::PackChunkState::default();
909        let mut pack_spool = if use_pack_spool {
910            Some(wire::PackChunkSpool::new_in(repo.heddle_dir())?)
911        } else {
912            None
913        };
914        let mut git_lane_repo = None;
915        let mut git_pack_state = GitPackPullInstallState::default();
916        let mut received = 0usize;
917        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
918            match message.body {
919                Some(pull_message::Body::Pack(chunk)) => {
920                    profile.bytes_received =
921                        profile.bytes_received.saturating_add(chunk.data.len());
922                    profile.pack_bytes_received =
923                        profile.pack_bytes_received.saturating_add(chunk.data.len());
924                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
925                        ProtocolError::InvalidState(
926                            "native pack chunk missing transfer checkpoint".to_string(),
927                        )
928                    })?;
929                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
930                        .unwrap_or(PackStreamKind::Unspecified);
931                    if stream_kind == PackStreamKind::Unspecified {
932                        return Err(ProtocolError::InvalidState(
933                            "native pack chunk missing stream kind".to_string(),
934                        ));
935                    }
936                    let decode_start = Instant::now();
937                    if let Some(pack_spool) = pack_spool.as_mut() {
938                        pack_spool.receive_chunk(
939                            stream_kind == PackStreamKind::Index,
940                            transfer.resume_offset,
941                            transfer.chunk_index,
942                            transfer.is_complete,
943                            &chunk.data,
944                            chunk.is_final_chunk,
945                        )?;
946                    } else {
947                        wire::receive_pack_chunk(
948                            &mut pack_state,
949                            stream_kind == PackStreamKind::Index,
950                            transfer.resume_offset,
951                            transfer.chunk_index,
952                            transfer.is_complete,
953                            &chunk.data,
954                            chunk.is_final_chunk,
955                        )?;
956                    }
957                    let decode_elapsed = decode_start.elapsed();
958                    profile.pack_decode += decode_elapsed;
959                    profile.pack_decode_apply += decode_elapsed;
960                    profile.decode += decode_elapsed;
961                }
962                Some(pull_message::Body::Redaction(transfer)) => {
963                    // Out-of-pack channel: receive a redaction sidecar
964                    // and route through `Repository::accept_wire_redactions`
965                    // for signature + trust-list verification. The
966                    // server emitted these only for blobs in our want
967                    // set that carry an active redaction.
968                    wire::check_received_transfer_blob_size(
969                        transfer.redactions_blob.len(),
970                        wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
971                        "redactions",
972                    )?;
973                    profile.bytes_received = profile
974                        .bytes_received
975                        .saturating_add(transfer.redactions_blob.len());
976                    profile.object_mix.record(ObjectType::Redaction);
977                    let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
978                        ProtocolError::InvalidState(format!(
979                            "RedactionTransfer.blob_hash is not a valid content hash: {err}"
980                        ))
981                    })?;
982                    let decode_start = Instant::now();
983                    repo.accept_wire_redactions(blob, &transfer.redactions_blob)
984                        .map_err(|err| {
985                            ProtocolError::InvalidState(format!(
986                                "accept_wire_redactions for blob {}: {err}",
987                                transfer.blob_hash
988                            ))
989                        })?;
990                    let decode_elapsed = decode_start.elapsed();
991                    profile.store_receive_object += decode_elapsed;
992                }
993                Some(pull_message::Body::StateVisibility(transfer)) => {
994                    wire::check_received_transfer_blob_size(
995                        transfer.state_visibility_blob.len(),
996                        wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
997                        "state-visibility",
998                    )?;
999                    profile.bytes_received = profile
1000                        .bytes_received
1001                        .saturating_add(transfer.state_visibility_blob.len());
1002                    profile.object_mix.record(ObjectType::StateVisibility);
1003                    let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
1004                        ProtocolError::InvalidState(format!(
1005                            "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
1006                        ))
1007                    })?;
1008                    let decode_start = Instant::now();
1009                    repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
1010                        .map_err(|err| {
1011                            ProtocolError::InvalidState(format!(
1012                                "accept_wire_state_visibility for state {}: {err}",
1013                                transfer.state_id
1014                            ))
1015                        })?;
1016                    let decode_elapsed = decode_start.elapsed();
1017                    profile.store_receive_object += decode_elapsed;
1018                }
1019                Some(pull_message::Body::GitLane(transfer)) => {
1020                    let decode_start = Instant::now();
1021                    profile.bytes_received = profile
1022                        .bytes_received
1023                        .saturating_add(git_lane_transfer_size(&transfer));
1024                    accept_git_lane_pull_transfer(
1025                        repo,
1026                        &mut git_lane_repo,
1027                        &mut git_pack_state,
1028                        transfer,
1029                    )?;
1030                    let decode_elapsed = decode_start.elapsed();
1031                    profile.store_receive_object += decode_elapsed;
1032                }
1033                Some(pull_message::Body::Complete(complete)) => {
1034                    profile.receive_and_apply = receive_start.elapsed();
1035                    git_pack_state.ensure_idle()?;
1036                    let final_state = if complete.new_state.is_empty() {
1037                        None
1038                    } else {
1039                        Some(
1040                            ChangeId::parse(&complete.new_state)
1041                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
1042                        )
1043                    };
1044
1045                    if complete.success {
1046                        if native_pack_required {
1047                            let store_start = Instant::now();
1048                            let installed_ids = if let Some(pack_spool) = pack_spool.as_mut() {
1049                                if !pack_spool.is_complete() {
1050                                    return Err(ProtocolError::InvalidState(
1051                                        "pull completed before native pack stream finished"
1052                                            .to_string(),
1053                                    ));
1054                                }
1055                                pack_spool.install_into(repo.store())?
1056                            } else {
1057                                if !pack_state.is_complete() {
1058                                    return Err(ProtocolError::InvalidState(
1059                                        "pull completed before native pack stream finished"
1060                                            .to_string(),
1061                                    ));
1062                                }
1063                                wire::install_received_pack(
1064                                    repo.store(),
1065                                    &pack_state.pack_data,
1066                                    &pack_state.index_data,
1067                                )?
1068                            };
1069                            profile.store_receive_object += store_start.elapsed();
1070                            received = installed_ids.len();
1071                            for id in installed_ids {
1072                                match (id, wanted_packable_type(&wanted_types, &id)) {
1073                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
1074                                        profile.object_mix.record(ObjectType::Blob);
1075                                        repo.clear_missing_blob(&hash)?;
1076                                    }
1077                                    (_, Some(obj_type)) => {
1078                                        profile.object_mix.record(obj_type);
1079                                    }
1080                                    (PackObjectId::ChangeId(_), None) => {
1081                                        profile.object_mix.record(ObjectType::State);
1082                                    }
1083                                    (PackObjectId::Hash(hash), None) => {
1084                                        let inferred =
1085                                            infer_installed_hash_object_type(repo, &hash)?;
1086                                        profile.object_mix.record(inferred);
1087                                    }
1088                                }
1089                            }
1090                        }
1091
1092                        let metadata_start = Instant::now();
1093                        if let Some(local_thread) = options.local_thread
1094                            && let Some(state) = final_state
1095                        {
1096                            repo.refs()
1097                                .set_thread(&ThreadName::from(local_thread), &state)?;
1098                        }
1099                        if let Some(state) = final_state
1100                            && allow_partial_fetch
1101                        {
1102                            mark_missing_blobs_for_state(repo, state)?;
1103                        } else if final_state.is_some() {
1104                            let _ = repo.clear_all_missing_blobs()?;
1105                        }
1106                        let synced_markers = complete
1107                            .transfer
1108                            .as_ref()
1109                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
1110                            .transpose()?
1111                            .unwrap_or(false);
1112                        if !synced_markers {
1113                            self.sync_local_markers(repo, repo_path).await?;
1114                        }
1115                        profile.metadata_sync = metadata_start.elapsed();
1116                        profile.objects_received = received;
1117                        return Ok(PullExchange {
1118                            result: PullComplete {
1119                                success: true,
1120                                final_state,
1121                                error: None,
1122                                transfer_id: complete
1123                                    .transfer
1124                                    .as_ref()
1125                                    .map(|transfer| transfer.transfer_id.clone())
1126                                    .unwrap_or_default(),
1127                                transport_mode: complete
1128                                    .transfer
1129                                    .as_ref()
1130                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
1131                                    .unwrap_or("native-pack")
1132                                    .to_string(),
1133                                resume_offset: complete
1134                                    .transfer
1135                                    .as_ref()
1136                                    .map(|transfer| transfer.resume_offset)
1137                                    .unwrap_or_default(),
1138                                chunk_index: complete
1139                                    .transfer
1140                                    .as_ref()
1141                                    .map(|transfer| transfer.chunk_index)
1142                                    .unwrap_or_default(),
1143                                checkpoint: complete
1144                                    .transfer
1145                                    .as_ref()
1146                                    .map(|transfer| transfer.checkpoint.clone())
1147                                    .unwrap_or_default(),
1148                                is_complete: complete
1149                                    .transfer
1150                                    .as_ref()
1151                                    .map(|transfer| transfer.is_complete)
1152                                    .unwrap_or(false),
1153                            },
1154                            object_count: received,
1155                            profile,
1156                        });
1157                    }
1158
1159                    profile.objects_received = received;
1160                    return Ok(PullExchange {
1161                        result: PullComplete {
1162                            success: false,
1163                            final_state,
1164                            error: (!complete.error.is_empty()).then_some(complete.error),
1165                            transfer_id: complete
1166                                .transfer
1167                                .as_ref()
1168                                .map(|transfer| transfer.transfer_id.clone())
1169                                .unwrap_or_default(),
1170                            transport_mode: complete
1171                                .transfer
1172                                .as_ref()
1173                                .map(|transfer| transport_mode_name(transfer.transport_mode))
1174                                .unwrap_or("native-pack")
1175                                .to_string(),
1176                            resume_offset: complete
1177                                .transfer
1178                                .as_ref()
1179                                .map(|transfer| transfer.resume_offset)
1180                                .unwrap_or_default(),
1181                            chunk_index: complete
1182                                .transfer
1183                                .as_ref()
1184                                .map(|transfer| transfer.chunk_index)
1185                                .unwrap_or_default(),
1186                            checkpoint: complete
1187                                .transfer
1188                                .as_ref()
1189                                .map(|transfer| transfer.checkpoint.clone())
1190                                .unwrap_or_default(),
1191                            is_complete: complete
1192                                .transfer
1193                                .as_ref()
1194                                .map(|transfer| transfer.is_complete)
1195                                .unwrap_or(false),
1196                        },
1197                        object_count: received,
1198                        profile,
1199                    });
1200                }
1201                _ => {}
1202            }
1203        }
1204
1205        Err(ProtocolError::InvalidState(format!(
1206            "pull stream ended unexpectedly after receiving {received} packed objects"
1207        )))
1208    }
1209}
1210
1211fn redaction_push_message(
1212    repo: &Repository,
1213    info: wire::ObjectInfo,
1214) -> Result<PushMessage, ProtocolError> {
1215    let wire::ObjectId::Hash(blob) = info.id else {
1216        return Err(ProtocolError::InvalidState(
1217            "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
1218        ));
1219    };
1220    let hex = blob.to_hex();
1221    // Sender-side: load the byte-identical sidecar payload
1222    // that `Repository::put_redaction` wrote to disk. The
1223    // receiver verifies the signature + trust list and then
1224    // persists these bytes verbatim.
1225    let bytes = repo
1226        .store()
1227        .get_redactions_bytes_for_blob(&blob)
1228        .map_err(|err| {
1229            ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
1230        })?
1231        .ok_or_else(|| {
1232            ProtocolError::InvalidState(format!(
1233                "server wants redaction for blob {} but sender has no sidecar",
1234                hex
1235            ))
1236        })?;
1237    Ok(PushMessage {
1238        body: Some(push_message::Body::Redaction(RedactionTransfer {
1239            blob_hash: hex,
1240            redactions_blob: bytes.into(),
1241        })),
1242    })
1243}
1244
1245fn native_pack_required_for_pull(
1246    want_full_closure: bool,
1247    transfer_plan: &RepositoryTransferPlan<ObjectInfo>,
1248) -> bool {
1249    transfer_plan.requires_native_pack(want_full_closure)
1250}
1251
1252fn object_info_from_plan(object: &PlannedObject) -> ObjectInfo {
1253    ObjectInfo {
1254        id: object.id.clone(),
1255        obj_type: object.obj_type,
1256        size: 0,
1257        delta_base: None,
1258    }
1259}
1260
1261fn to_proto_planned_object(object: &PlannedObject) -> ObjectDescriptor {
1262    object_descriptor_with_status(
1263        &object_info_from_plan(object),
1264        ObjectAvailabilityStatus::Present,
1265        "",
1266    )
1267}
1268
1269fn descriptor_id_from_plan(object: &PlannedObject) -> (String, String) {
1270    let id = match &object.id {
1271        wire::ObjectId::Hash(hash) => hash.to_hex(),
1272        wire::ObjectId::ChangeId(change_id) => change_id.to_string_full(),
1273    };
1274    (id, object_type_name(object.obj_type).to_string())
1275}
1276
1277fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
1278    let types = wanted_types.entry(pack_id).or_default();
1279    if !types.contains(&obj_type) {
1280        types.push(obj_type);
1281    }
1282}
1283
1284fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
1285    wanted_types.get(pack_id).and_then(|types| {
1286        types
1287            .iter()
1288            .copied()
1289            .find(|obj_type| obj_type.packable())
1290    })
1291}
1292
1293fn sidecar_push_message(
1294    repo: &Repository,
1295    info: wire::ObjectInfo,
1296) -> Result<PushMessage, ProtocolError> {
1297    match info.obj_type {
1298        ObjectType::Redaction => redaction_push_message(repo, info),
1299        ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1300        obj_type => Err(ProtocolError::InvalidState(format!(
1301            "{obj_type:?} is not an out-of-pack sidecar object"
1302        ))),
1303    }
1304}
1305
1306fn state_visibility_push_message(
1307    repo: &Repository,
1308    info: wire::ObjectInfo,
1309) -> Result<PushMessage, ProtocolError> {
1310    let wire::ObjectId::ChangeId(state) = info.id else {
1311        return Err(ProtocolError::InvalidState(
1312            "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1313        ));
1314    };
1315    let state_id = state.to_string_full();
1316    let bytes = repo
1317        .get_state_visibility_bytes_for_state(&state)
1318        .map_err(|err| {
1319            ProtocolError::InvalidState(format!(
1320                "load state-visibility sidecar for {}: {err}",
1321                state_id
1322            ))
1323        })?
1324        .ok_or_else(|| {
1325            ProtocolError::InvalidState(format!(
1326                "server wants state visibility for state {} but sender has no sidecar",
1327                state_id
1328            ))
1329        })?;
1330    Ok(PushMessage {
1331        body: Some(push_message::Body::StateVisibility(
1332            StateVisibilityTransfer {
1333                state_id,
1334                state_visibility_blob: bytes.into(),
1335            },
1336        )),
1337    })
1338}
1339
1340fn load_thread_metadata(
1341    repo: &Repository,
1342    target_thread: &str,
1343    local_state: ChangeId,
1344) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1345    let thread_manager = ThreadManager::new(repo.heddle_dir());
1346    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1347}
1348
1349fn plan_pull_wants(
1350    repo: &Repository,
1351    remote_state: &ChangeId,
1352    full_closure_available: bool,
1353    objects_to_fetch: Vec<ObjectDescriptor>,
1354    allow_partial_fetch: bool,
1355) -> Result<PullWantPlan, ProtocolError> {
1356    if full_closure_available {
1357        return Ok(PullWantPlan {
1358            wants: Vec::new(),
1359            transfer_plan: RepositoryTransferPlan::from_object_infos(
1360                Vec::<ObjectInfo>::new(),
1361                GitLaneTransferIntent::HeddleObjectsOnly,
1362            ),
1363            wanted_types: HashMap::new(),
1364            want_full_closure: true,
1365        });
1366    }
1367    let request_full_closure =
1368        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1369    let mut wants = Vec::with_capacity(objects_to_fetch.len());
1370    let mut wanted_infos = Vec::with_capacity(objects_to_fetch.len());
1371    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1372
1373    for descriptor in objects_to_fetch {
1374        let info = parse_descriptor_to_info(descriptor)?;
1375        let pack_id = match &info.id {
1376            wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1377            wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1378        };
1379        let include = if request_full_closure {
1380            true
1381        } else {
1382            let has = wire::has_object(repo.store(), &info)?;
1383            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1384        };
1385
1386        if include {
1387            record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1388            wants.push(object_descriptor_with_status(
1389                &info,
1390                ObjectAvailabilityStatus::Missing,
1391                "requested by client",
1392            ));
1393            wanted_infos.push(info);
1394        }
1395    }
1396
1397    Ok(PullWantPlan {
1398        wants,
1399        transfer_plan: RepositoryTransferPlan::from_object_infos(
1400            wanted_infos,
1401            GitLaneTransferIntent::HeddleObjectsOnly,
1402        ),
1403        wanted_types,
1404        want_full_closure: false,
1405    })
1406}
1407
1408fn supports_compact_full_pull(
1409    repo: &Repository,
1410    allow_partial_fetch: bool,
1411    exclude_states: &[ChangeId],
1412) -> Result<bool, ProtocolError> {
1413    if allow_partial_fetch || !exclude_states.is_empty() {
1414        return Ok(false);
1415    }
1416    repo_looks_fresh(repo)
1417}
1418
1419/// For a bare `pull` (no explicit `--local-thread`), determine which locally
1420/// held head — if any — is safe to advertise to the server as an
1421/// `exclude_states` entry so the server prunes its closure to the delta.
1422///
1423/// Advertising state S asserts "I already hold S's FULL object closure
1424/// locally." If we advertise a head whose closure we do NOT fully have, the
1425/// server omits those objects and we silently end up with an incomplete repo.
1426/// The server trusts this assertion blindly, so the entire correctness burden
1427/// is here. We therefore advertise a head ONLY when:
1428///
1429/// 1. A target-state override is not in play (the override drives the want
1430///    plan directly; advertising the thread head would be unrelated).
1431/// 2. The local repo holds no recorded missing blobs (a partial/lazy clone
1432///    can hold a state's metadata while its blobs were never fetched — never
1433///    advertise such a head).
1434/// 3. The thread we're about to update (`remote_thread`) resolves to a local
1435///    head whose ENTIRE object closure is present locally — proven by walking
1436///    it with `enumerate_state_closure`, which errors `ObjectNotFound` on the
1437///    first absent state/tree/blob.
1438///
1439/// When any check fails we return `None` and the caller falls back to the
1440/// (correct, just slower) empty-exclude full pull. Correctness > speed.
1441fn locally_complete_pull_head(
1442    repo: &Repository,
1443    remote_thread: &str,
1444    target_state: Option<ChangeId>,
1445) -> Result<Option<ChangeId>, ProtocolError> {
1446    locally_complete_thread_head(repo, remote_thread, target_state)
1447}
1448
1449/// Same completeness gate for an explicit `--local-thread`: the user named the
1450/// local thread whose head should be advertised as already-held. The cardinal
1451/// risk is identical to the bare path — advertising a head whose closure we do
1452/// NOT fully hold makes the server prune objects we lack and silently leaves us
1453/// with an incomplete repo. A `--local-thread` pointed at a partial/lazy clone
1454/// or an interrupted prior pull is exactly that hazard, so it must clear the
1455/// same checks (no target-state override, no recorded missing blobs, full
1456/// closure present) before it may be advertised.
1457fn locally_complete_local_thread_head(
1458    repo: &Repository,
1459    local_thread: &str,
1460    target_state: Option<ChangeId>,
1461) -> Result<Option<ChangeId>, ProtocolError> {
1462    locally_complete_thread_head(repo, local_thread, target_state)
1463}
1464
1465/// Shared completeness gate: given the name of a thread whose head we are about
1466/// to advertise as an `exclude_states` entry, return that head ONLY when its
1467/// full object closure is provably present locally; otherwise `None` (caller
1468/// falls back to the correct, slower empty-exclude full pull).
1469///
1470/// Advertising state S asserts "I already hold S's FULL object closure
1471/// locally." If we advertise a head whose closure we do NOT fully have, the
1472/// server omits those objects and we silently end up with an incomplete repo.
1473/// The server trusts this assertion blindly, so the entire correctness burden
1474/// is here. We therefore advertise a head ONLY when:
1475///
1476/// 1. A target-state override is not in play (the override drives the want
1477///    plan directly; advertising the thread head would be unrelated).
1478/// 2. The local repo holds no recorded missing blobs (a partial/lazy clone
1479///    can hold a state's metadata while its blobs were never fetched — never
1480///    advertise such a head).
1481/// 3. The named thread resolves to a local head whose ENTIRE object closure is
1482///    present locally — proven by walking it with `enumerate_state_closure`,
1483///    which errors `ObjectNotFound` on the first absent state/tree/blob.
1484fn locally_complete_thread_head(
1485    repo: &Repository,
1486    thread: &str,
1487    target_state: Option<ChangeId>,
1488) -> Result<Option<ChangeId>, ProtocolError> {
1489    // A target-state override pulls a specific state, not the thread tip;
1490    // advertising the thread head here would not match what's being fetched.
1491    if target_state.is_some() {
1492        return Ok(None);
1493    }
1494    // A repo carrying known-missing blobs is partial/lazy: it may hold a
1495    // state's metadata while lacking its blob content. Never advertise.
1496    if !repo.missing_blobs()?.is_empty() {
1497        return Ok(None);
1498    }
1499    let Some(head) = repo.refs().get_thread(&ThreadName::from(thread))? else {
1500        // Fresh local repo (no local head for this thread) — nothing to
1501        // advertise; the server sends the full closure as before.
1502        return Ok(None);
1503    };
1504    // Prove the head's closure is fully present locally. `enumerate_state_closure`
1505    // loads every state, tree, and blob in the closure and errors `ObjectNotFound`
1506    // on the first absent object. A clean `Ok` is the completeness guarantee that
1507    // makes advertising this head safe.
1508    match wire::enumerate_state_closure(repo.store(), head) {
1509        Ok(_) => Ok(Some(head)),
1510        Err(ProtocolError::ObjectNotFound(_)) => Ok(None),
1511        Err(err) => Err(err),
1512    }
1513}
1514
1515fn should_request_full_closure(
1516    repo: &Repository,
1517    remote_state: &ChangeId,
1518    allow_partial_fetch: bool,
1519) -> Result<bool, ProtocolError> {
1520    if allow_partial_fetch || repo.store().has_state(remote_state)? {
1521        return Ok(false);
1522    }
1523    repo_looks_fresh(repo)
1524}
1525
1526fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1527    if repo.head()?.is_some() {
1528        return Ok(false);
1529    }
1530    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1531        return Ok(false);
1532    }
1533    Ok(repo.missing_blobs()?.is_empty())
1534}
1535
1536fn infer_installed_hash_object_type(
1537    repo: &Repository,
1538    hash: &ContentHash,
1539) -> Result<ObjectType, ProtocolError> {
1540    let store = repo.store();
1541    if store.get_tree(hash)?.is_some() {
1542        return Ok(ObjectType::Tree);
1543    }
1544    if store
1545        .get_action(&objects::object::ActionId::from_hash(*hash))?
1546        .is_some()
1547    {
1548        return Ok(ObjectType::Action);
1549    }
1550    Ok(ObjectType::Blob)
1551}
1552
1553fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1554    const HEADER: &str = "heddle-markers-v1\n";
1555    if checkpoint.is_empty() {
1556        return Ok(false);
1557    }
1558    let payload = std::str::from_utf8(checkpoint)
1559        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1560    let Some(lines) = payload.strip_prefix(HEADER) else {
1561        return Ok(false);
1562    };
1563
1564    for line in lines.lines() {
1565        if line.is_empty() {
1566            continue;
1567        }
1568        let Some((name, change_id)) = line.split_once('\t') else {
1569            return Err(ProtocolError::InvalidState(
1570                "invalid marker snapshot line".to_string(),
1571            ));
1572        };
1573        let change_id = ChangeId::parse(change_id)
1574            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1575        if !repo.store().has_state(&change_id)? {
1576            continue;
1577        }
1578        let name = MarkerName::from(name);
1579        match repo.refs().get_marker(&name)? {
1580            Some(existing) if existing == change_id => {}
1581            Some(existing) => repo.refs().set_marker_cas(
1582                &name,
1583                refs::RefExpectation::Value(existing),
1584                &change_id,
1585            )?,
1586            None => repo.refs().create_marker(&name, &change_id)?,
1587        }
1588    }
1589
1590    Ok(true)
1591}
1592
1593fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1594    if s.is_empty() {
1595        return Vec::new();
1596    }
1597    objects::object::ChangeId::parse(s)
1598        .map(|id| id.as_bytes().to_vec())
1599        .unwrap_or_default()
1600}
1601
1602fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1603    ThreadMetadata {
1604        name: metadata.thread.clone(),
1605        target_thread: metadata.target_thread.clone(),
1606        parent_thread: metadata.parent_thread.clone(),
1607        task: metadata.task.clone(),
1608        thread_mode: metadata.mode.to_string(),
1609        thread_state: metadata.state.to_string(),
1610        freshness: metadata.freshness.to_string(),
1611        base_state: change_id_string_to_bytes(&metadata.base_state),
1612        base_root: change_id_string_to_bytes(&metadata.base_root),
1613        current_state: metadata
1614            .current_state
1615            .as_deref()
1616            .map(change_id_string_to_bytes),
1617        merged_state: metadata
1618            .merged_state
1619            .as_deref()
1620            .map(change_id_string_to_bytes),
1621        changed_paths: metadata.changed_paths.clone(),
1622        impact_categories: metadata
1623            .impact_categories
1624            .iter()
1625            .map(ToString::to_string)
1626            .collect(),
1627        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1628        promotion_suggested: metadata.promotion_suggested,
1629        verification_summary: Some(ThreadVerificationSummary {
1630            tests_passed: metadata.verification_summary.tests_passed,
1631            tests_failed: metadata
1632                .verification_summary
1633                .tests_failed
1634                .unwrap_or_default(),
1635            coverage_pct: metadata.verification_summary.coverage_pct,
1636            lint_warnings: metadata.verification_summary.lint_warnings,
1637        }),
1638        confidence_summary: Some(ThreadConfidenceSummary {
1639            value: metadata.confidence_summary.value,
1640            band: metadata
1641                .confidence_summary
1642                .band
1643                .as_ref()
1644                .map(ToString::to_string),
1645        }),
1646        integration_policy_result: Some(ThreadIntegrationPolicy {
1647            status: metadata
1648                .integration_policy_result
1649                .status
1650                .clone()
1651                .unwrap_or_default(),
1652            reason: metadata
1653                .integration_policy_result
1654                .reason
1655                .clone()
1656                .unwrap_or_default(),
1657        }),
1658        created_at: Some(prost_types::Timestamp {
1659            seconds: metadata.created_at.timestamp(),
1660            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1661        }),
1662        updated_at: Some(prost_types::Timestamp {
1663            seconds: metadata.updated_at.timestamp(),
1664            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1665        }),
1666    }
1667}
1668
1669struct PullExchange {
1670    result: PullComplete,
1671    object_count: usize,
1672    profile: PullProfile,
1673}
1674
1675fn mark_missing_blobs_for_state(
1676    repo: &Repository,
1677    state_id: ChangeId,
1678) -> Result<(), ProtocolError> {
1679    let state = repo
1680        .store()
1681        .get_state(&state_id)?
1682        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1683    let mut missing = wire::missing_blobs_in_tree(repo.store(), state.tree)?;
1684    if let Some(context_root) = state.context.as_ref() {
1685        missing.extend(wire::missing_blobs_in_tree(repo.store(), *context_root)?);
1686    }
1687    if let Some(discussions_blob) = state.discussions.as_ref()
1688        && !repo.store().has_blob(discussions_blob)?
1689    {
1690        missing.push(*discussions_blob);
1691    }
1692    missing
1693        .into_iter()
1694        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1695}
1696
1697fn clear_missing_blobs_for_state(
1698    repo: &Repository,
1699    state_id: ChangeId,
1700) -> Result<(), ProtocolError> {
1701    let state = repo
1702        .store()
1703        .get_state(&state_id)?
1704        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1705    let mut missing = wire::missing_blobs_in_tree(repo.store(), state.tree)?;
1706    if let Some(context_root) = state.context.as_ref() {
1707        missing.extend(wire::missing_blobs_in_tree(repo.store(), *context_root)?);
1708    }
1709    if let Some(discussions_blob) = state.discussions.as_ref() {
1710        missing.push(*discussions_blob);
1711    }
1712    missing
1713        .into_iter()
1714        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1715}
1716
1717fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1718    match repo.missing_blobs() {
1719        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1720        Ok(_) => PartialFetchStatus::Disabled as i32,
1721        Err(_) => PartialFetchStatus::Unspecified as i32,
1722    }
1723}
1724
1725fn pull_transfer_id(
1726    repo_path: &str,
1727    remote_thread: &str,
1728    local_thread: Option<&str>,
1729    depth: Option<u32>,
1730    target_state: Option<ChangeId>,
1731) -> String {
1732    format!(
1733        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1734        local_thread.unwrap_or_default(),
1735        target_state
1736            .map(|value| value.to_string_full())
1737            .unwrap_or_default()
1738    )
1739}
1740
1741fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1742    format!(
1743        "push:{repo_path}:{}:{target_thread}",
1744        local_state.to_string_full()
1745    )
1746}
1747
1748/// Build the git-mirror push plan: read ALL refs from the git ODB, resolve
1749/// each to its object oid, build ONE multi-root pack over the resolved
1750/// targets, and emit N checkpoint-less `GitRefUpdateTransfer` messages.
1751///
1752/// `remote_ref_expectations` maps each server-side ref name to the git
1753/// revision address the server currently holds for it (from `ListRefs`);
1754/// unlisted refs are treated as expected-missing (create). Callers fetch
1755/// this map before building the plan — the builder itself is synchronous so
1756/// it can be exercised without a live server.
1757///
1758/// The signal for mirror mode is `checkpoint: None` on every ref update
1759/// (plan §B.4) — do NOT change this discriminator without the matching weft
1760/// server change (`feat/git-mirror-ref-scope`).
1761fn build_git_mirror_push_plan(
1762    repo: &Repository,
1763    chunk_size: usize,
1764    remote_ref_expectations: &HashMap<String, GitRefRemoteExpectation>,
1765) -> Result<GitLanePushPlan, ProtocolError> {
1766    if repo.capability() != RepositoryCapability::GitOverlay {
1767        return Err(ProtocolError::InvalidState(
1768            "Git mirror pushes require a git-overlay repository".to_string(),
1769        ));
1770    }
1771    let git_repo = repo
1772        .git_overlay_sley_repository()
1773        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1774        .ok_or_else(|| {
1775            ProtocolError::InvalidState("git-overlay repository has no Git store".to_string())
1776        })?;
1777    build_git_mirror_plan_from_sley(&git_repo, chunk_size, remote_ref_expectations)
1778}
1779
1780/// Core of the mirror plan builder, operating directly on a sley repository
1781/// so it is unit-testable without a hosted `Repository` façade.
1782fn build_git_mirror_plan_from_sley(
1783    git_repo: &SleyRepository,
1784    chunk_size: usize,
1785    remote_ref_expectations: &HashMap<String, GitRefRemoteExpectation>,
1786) -> Result<GitLanePushPlan, ProtocolError> {
1787    let refs = git_repo
1788        .references()
1789        .list_refs()
1790        .map_err(|err| ProtocolError::InvalidState(format!("list git-overlay refs: {err}")))?;
1791
1792    let mut roots: Vec<GitObjectId> = Vec::new();
1793    let mut ref_updates: Vec<PushMessage> = Vec::new();
1794    let mut newest_root: Option<GitObjectId> = None;
1795
1796    for reference in refs {
1797        // Local-only bookkeeping refs must NOT ship to the hosted server now
1798        // that the mirror path is the DEFAULT `heddle push` (#846). These four
1799        // namespaces are purely local git machinery, never content:
1800        //   - refs/stash       : the stash reflog stack (local WIP)
1801        //   - refs/remotes/*    : this clone's remote-tracking refs (the
1802        //                         server has its own view of remotes)
1803        //   - refs/original/*   : filter-branch/-repo backups (local undo)
1804        //   - refs/replace/*    : local object replacements (grafts)
1805        // Excluding them BEFORE the readability check below also means a
1806        // single dangling/unreadable ref in one of these namespaces (e.g. a
1807        // stale `refs/original/*` backup) no longer fails the whole push.
1808        // Content refs — refs/heads/*, refs/tags/*, refs/notes/* (incl.
1809        // heddle's `refs/notes/heddle` state metadata) — are kept.
1810        if GitRefName::new(&reference.name).is_local_only() {
1811            continue;
1812        }
1813
1814        // Only direct refs are pushable ref updates. Symbolic refs (e.g.
1815        // `HEAD`) name another ref that is itself pushed separately; sending
1816        // a ref update for the symbolic name would push the pointed-at oid
1817        // under the wrong name.
1818        let target_oid = match reference.target {
1819            ReferenceTarget::Direct(oid) => oid,
1820            ReferenceTarget::Symbolic(_) => continue,
1821        };
1822
1823        // Verify the target object is present in the ODB and learn whether
1824        // it is a tag (so we can populate `peeled_oid`). A dangling ref
1825        // whose target object is missing cannot be packed — surface it.
1826        let object = git_repo.read_object(&target_oid).map_err(|err| {
1827            ProtocolError::InvalidState(format!(
1828                "git-overlay ref {} target {} is not readable: {err}",
1829                reference.name,
1830                target_oid.to_hex()
1831            ))
1832        })?;
1833
1834        let peeled_oid = if object.object_type == sley::GitObjectType::Tag {
1835            let peeled = git_repo.peel_to_object_oid(target_oid).map_err(|err| {
1836                ProtocolError::InvalidState(format!(
1837                    "peel git-overlay tag ref {}: {err}",
1838                    reference.name
1839                ))
1840            })?;
1841            Some(peeled)
1842        } else {
1843            None
1844        };
1845
1846        // The pack root is the ref's direct target: packing a tag oid pulls
1847        // the tag object AND its referent closure; packing a commit pulls
1848        // that commit's closure.
1849        roots.push(target_oid);
1850        newest_root.get_or_insert(target_oid);
1851
1852        let expectation = remote_ref_expectations
1853            .get(&reference.name)
1854            .cloned()
1855            .unwrap_or(GitRefRemoteExpectation::Missing);
1856
1857        let kind = grpc_git_ref_kind(GitRefName::new(&reference.name).wire_kind());
1858        let mut message =
1859            git_ref_update_message(&reference.name, kind, target_oid, peeled_oid, None);
1860        apply_git_ref_expectation_value(&mut message, &expectation)?;
1861        ref_updates.push(message);
1862    }
1863
1864    if roots.is_empty() {
1865        return Err(ProtocolError::InvalidState(
1866            "git-overlay repository has no direct refs to mirror".to_string(),
1867        ));
1868    }
1869
1870    let pack = build_git_lane_multi_root_pack_plan(git_repo, roots, chunk_size)?;
1871
1872    // `local_revision_address` is advisory in mirror mode (per-ref
1873    // expectations are already applied); use the first resolved target.
1874    let local_revision_address = newest_root
1875        .map(|oid| RevisionAddress::git_commit(oid.to_hex()).to_string())
1876        .unwrap_or_default();
1877
1878    Ok(GitLanePushPlan {
1879        local_revision_address,
1880        pack,
1881        ref_updates,
1882    })
1883}
1884
1885fn grpc_git_ref_kind(kind: ClassifiedGitRefKind) -> GrpcGitRefKind {
1886    match kind {
1887        ClassifiedGitRefKind::Branch => GrpcGitRefKind::Branch,
1888        ClassifiedGitRefKind::Tag => GrpcGitRefKind::Tag,
1889        ClassifiedGitRefKind::Note => GrpcGitRefKind::Note,
1890        ClassifiedGitRefKind::Other => GrpcGitRefKind::Other,
1891    }
1892}
1893
1894/// Plan a single pack over `roots` (N for the git-mirror path).
1895fn build_git_lane_multi_root_pack_plan(
1896    git_repo: &SleyRepository,
1897    roots: Vec<GitObjectId>,
1898    chunk_size: usize,
1899) -> Result<GitPackPushPlan, ProtocolError> {
1900    if roots.is_empty() {
1901        return Err(ProtocolError::InvalidState(
1902            "cannot plan a Git pack with no roots".to_string(),
1903        ));
1904    }
1905    let plan = git_repo
1906        .reachable_pack_plan()
1907        .roots(roots.iter().copied())
1908        .build()
1909        .map_err(|err| {
1910            ProtocolError::InvalidState(format!("plan reachable Git pack stream: {err}"))
1911        })?
1912        .ok_or_else(|| {
1913            ProtocolError::InvalidState("roots did not produce a reachable Git pack".to_string())
1914        })?;
1915    let pack_file = NamedTempFile::new().map_err(|err| {
1916        ProtocolError::InvalidState(format!("create Git pack tempfile: {err}"))
1917    })?;
1918    let prepared = plan
1919        .prepare_to_file(pack_file.path())
1920        .map_err(|err| {
1921            ProtocolError::InvalidState(format!("write reachable Git pack tempfile: {err}"))
1922        })?;
1923    let pack_size = prepared.summary.pack_size;
1924    let checksum = prepared.summary.checksum;
1925    if pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
1926        return Err(ProtocolError::InvalidState(format!(
1927            "Git pack exceeds maximum transfer size of {} bytes; multi-pack split for repos over this size is a follow-up (plan §B.2)",
1928            wire::MAX_RECEIVED_GIT_PACK_SIZE
1929        )));
1930    }
1931    let chunk_size = chunk_size.max(1) as u64;
1932    let chunk_count = pack_size.div_ceil(chunk_size);
1933    if chunk_count > u32::MAX as u64 {
1934        return Err(ProtocolError::InvalidState(
1935            "Git pack chunk count exceeds u32".to_string(),
1936        ));
1937    }
1938    let transfer_id = format!("git-pack:{}", checksum.to_hex());
1939    Ok(GitPackPushPlan {
1940        transfer_id,
1941        pack_id: checksum.as_bytes().to_vec(),
1942        pack_size,
1943        roots,
1944        pack_file: Arc::new(Mutex::new(pack_file)),
1945    })
1946}
1947
1948async fn send_git_pack_streaming_messages(
1949    tx: &mpsc::Sender<PushMessage>,
1950    pack: &GitPackPushPlan,
1951    chunk_size: usize,
1952    progress: &Progress,
1953) -> Result<(), ProtocolError> {
1954    let tx = tx.clone();
1955    let pack = pack.clone();
1956    let progress = progress.clone();
1957    tokio::task::spawn_blocking(move || {
1958        stream_git_pack_messages_blocking(tx, pack, chunk_size, progress)
1959    })
1960    .await
1961    .map_err(|err| ProtocolError::InvalidState(format!("Git pack streaming task failed: {err}")))?
1962}
1963
1964fn stream_git_pack_messages_blocking(
1965    tx: mpsc::Sender<PushMessage>,
1966    pack: GitPackPushPlan,
1967    chunk_size: usize,
1968    progress: Progress,
1969) -> Result<(), ProtocolError> {
1970    let mut writer = GitPackPushMessageWriter::new(
1971        tx,
1972        pack.transfer_id.clone(),
1973        pack.pack_id.clone(),
1974        pack.pack_size,
1975        chunk_size,
1976        progress,
1977    );
1978    let mut pack_file = pack.pack_file.lock().map_err(|err| {
1979        ProtocolError::InvalidState(format!("lock Git pack tempfile: {err}"))
1980    })?;
1981    pack_file
1982        .seek(SeekFrom::Start(0))
1983        .map_err(|err| ProtocolError::InvalidState(format!("rewind Git pack tempfile: {err}")))?;
1984    let streamed = io::copy(&mut pack_file.as_file_mut(), &mut writer).map_err(|err| {
1985        ProtocolError::InvalidState(format!("stream Git pack tempfile: {err}"))
1986    })?;
1987    if streamed != pack.pack_size {
1988        return Err(ProtocolError::InvalidState(format!(
1989            "Git pack stream changed while sending; expected {} bytes/{}, streamed {} bytes",
1990            pack.pack_size,
1991            hex::encode(&pack.pack_id),
1992            streamed
1993        )));
1994    }
1995    writer.finish()?;
1996    Ok(())
1997}
1998
1999struct GitPackPushMessageWriter {
2000    tx: mpsc::Sender<PushMessage>,
2001    transfer_id: String,
2002    pack_id: Vec<u8>,
2003    pack_size: u64,
2004    chunk_size: usize,
2005    buffer: Vec<u8>,
2006    offset: u64,
2007    chunk_index: u32,
2008    /// Live "uploading N/M bytes" progress, driven per flushed chunk. A null
2009    /// handle (`--output json` / non-TTY) makes every update a no-op.
2010    progress: Progress,
2011    /// Last integer percent painted, so the "uploading" phase line repaints at
2012    /// most ~101 times regardless of chunk count. `u64::MAX` forces the first
2013    /// chunk to paint.
2014    last_progress_pct: u64,
2015}
2016
2017impl GitPackPushMessageWriter {
2018    fn new(
2019        tx: mpsc::Sender<PushMessage>,
2020        transfer_id: String,
2021        pack_id: Vec<u8>,
2022        pack_size: u64,
2023        chunk_size: usize,
2024        progress: Progress,
2025    ) -> Self {
2026        let chunk_size = chunk_size.max(1);
2027        Self {
2028            tx,
2029            transfer_id,
2030            pack_id,
2031            pack_size,
2032            chunk_size,
2033            buffer: Vec::with_capacity(chunk_size),
2034            offset: 0,
2035            chunk_index: 0,
2036            progress,
2037            last_progress_pct: u64::MAX,
2038        }
2039    }
2040
2041    fn send_buffer(&mut self) -> io::Result<()> {
2042        if self.buffer.is_empty() {
2043            return Ok(());
2044        }
2045        let chunk = std::mem::take(&mut self.buffer);
2046        let next_offset = self.offset.checked_add(chunk.len() as u64).ok_or_else(|| {
2047            io::Error::new(io::ErrorKind::InvalidData, "Git pack offset overflow")
2048        })?;
2049        if next_offset > self.pack_size {
2050            return Err(io::Error::new(
2051                io::ErrorKind::InvalidData,
2052                format!(
2053                    "Git pack stream exceeded planned size {}; got at least {}",
2054                    self.pack_size, next_offset
2055                ),
2056            ));
2057        }
2058        let is_final_chunk = next_offset == self.pack_size;
2059        self.tx
2060            .blocking_send(git_lane_push_message(git_lane_transfer::Body::Pack(
2061                GitPackTransfer {
2062                    transfer_id: self.transfer_id.clone(),
2063                    offset: self.offset,
2064                    chunk_index: self.chunk_index,
2065                    is_final_chunk,
2066                    pack_size: self.pack_size,
2067                    pack_chunk: chunk.into(),
2068                    pack_id: self.pack_id.clone().into(),
2069                },
2070            )))
2071            .map_err(|_| {
2072                io::Error::new(io::ErrorKind::BrokenPipe, "push stream closed unexpectedly")
2073            })?;
2074        self.offset = next_offset;
2075        self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
2076            io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk index overflow")
2077        })?;
2078        self.report_upload_progress();
2079        Ok(())
2080    }
2081
2082    /// Paint the live "uploading N/M bytes" line for the bytes flushed so far,
2083    /// throttled to one repaint per integer percent so a large pack does not
2084    /// spend its time formatting. A null (`--output json` / non-TTY) handle
2085    /// short-circuits before any formatting.
2086    fn report_upload_progress(&mut self) {
2087        if !self.progress.is_active() {
2088            return;
2089        }
2090        let pct = self.offset.saturating_mul(100) / self.pack_size.max(1);
2091        if pct == self.last_progress_pct {
2092            return;
2093        }
2094        self.last_progress_pct = pct;
2095        self.progress.set_phase(format!(
2096            "uploading {}/{} bytes ({pct}%)",
2097            self.offset, self.pack_size
2098        ));
2099    }
2100
2101    fn finish(mut self) -> Result<(), ProtocolError> {
2102        self.send_buffer().map_err(|err| {
2103            ProtocolError::InvalidState(format!("send final Git pack chunk: {err}"))
2104        })?;
2105        if self.offset != self.pack_size {
2106            return Err(ProtocolError::InvalidState(format!(
2107                "Git pack stream length mismatch: expected {}, got {}",
2108                self.pack_size, self.offset
2109            )));
2110        }
2111        Ok(())
2112    }
2113}
2114
2115impl Write for GitPackPushMessageWriter {
2116    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
2117        let written = buf.len();
2118        while !buf.is_empty() {
2119            if self.offset == self.pack_size {
2120                return Err(io::Error::new(
2121                    io::ErrorKind::InvalidData,
2122                    format!("Git pack stream wrote past planned size {}", self.pack_size),
2123                ));
2124            }
2125            let capacity = self
2126                .chunk_size
2127                .checked_sub(self.buffer.len())
2128                .ok_or_else(|| {
2129                    io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk buffer overflow")
2130                })?;
2131            let take = capacity.min(buf.len());
2132            self.buffer.extend_from_slice(&buf[..take]);
2133            buf = &buf[take..];
2134            if self.buffer.len() == self.chunk_size {
2135                self.send_buffer()?;
2136            }
2137        }
2138        Ok(written)
2139    }
2140
2141    fn flush(&mut self) -> io::Result<()> {
2142        Ok(())
2143    }
2144}
2145
2146/// Build a single `GitRefUpdateTransfer` message.
2147///
2148/// `checkpoint` is `Some(..)` for the native checkpoint path and `None` for
2149/// the git-mirror path — the `None` case is the wire signal the weft server
2150/// uses to admit checkpoint-less multi-ref pushes (plan §B.4). `peeled_oid`
2151/// is set for annotated-tag refs (the underlying object the tag names).
2152fn git_ref_update_message(
2153    name: &str,
2154    kind: GrpcGitRefKind,
2155    target_oid: GitObjectId,
2156    peeled_oid: Option<GitObjectId>,
2157    checkpoint: Option<GitCheckpointTransfer>,
2158) -> PushMessage {
2159    git_lane_push_message(git_lane_transfer::Body::RefUpdate(GitRefUpdateTransfer {
2160        name: name.to_string(),
2161        kind: kind as i32,
2162        target_oid: target_oid.as_bytes().to_vec().into(),
2163        peeled_oid: peeled_oid
2164            .map(|oid| oid.as_bytes().to_vec())
2165            .unwrap_or_default()
2166            .into(),
2167        expected_missing: false,
2168        expected_target_oid: Vec::new().into(),
2169        checkpoint,
2170    }))
2171}
2172
2173fn apply_git_ref_expectation_value(
2174    message: &mut PushMessage,
2175    expectation: &GitRefRemoteExpectation,
2176) -> Result<(), ProtocolError> {
2177    let Some(push_message::Body::GitLane(GitLaneTransfer {
2178        body: Some(git_lane_transfer::Body::RefUpdate(update)),
2179    })) = message.body.as_mut()
2180    else {
2181        return Err(ProtocolError::InvalidState(
2182            "Git lane push plan missing ref update message".to_string(),
2183        ));
2184    };
2185    match expectation {
2186        GitRefRemoteExpectation::Missing => {
2187            update.expected_missing = true;
2188            update.expected_target_oid = Vec::new().into();
2189        }
2190        GitRefRemoteExpectation::Value(oid) => {
2191            update.expected_missing = false;
2192            update.expected_target_oid = oid.clone().into();
2193        }
2194    }
2195    Ok(())
2196}
2197
2198#[derive(Clone, Debug, PartialEq, Eq)]
2199enum GitRefRemoteExpectation {
2200    Missing,
2201    Value(Vec<u8>),
2202}
2203
2204fn parse_git_ref_expectation(
2205    remote_revision_address: &str,
2206) -> Result<GitRefRemoteExpectation, ProtocolError> {
2207    if remote_revision_address.is_empty() {
2208        return Ok(GitRefRemoteExpectation::Missing);
2209    }
2210
2211    match remote_revision_address.parse::<RevisionAddress>() {
2212        Ok(RevisionAddress::Heddle(_)) => Ok(GitRefRemoteExpectation::Missing),
2213        Ok(RevisionAddress::GitCommit(oid)) => hex::decode(&oid)
2214            .map(GitRefRemoteExpectation::Value)
2215            .map_err(|err| {
2216                ProtocolError::InvalidState(format!(
2217                    "server returned invalid Git remote_revision_address: {err}"
2218                ))
2219            }),
2220        Err(err) => Err(ProtocolError::InvalidState(format!(
2221            "server returned invalid remote_revision_address {remote_revision_address:?}: {err}"
2222        ))),
2223    }
2224}
2225
2226fn git_lane_push_message(body: git_lane_transfer::Body) -> PushMessage {
2227    PushMessage {
2228        body: Some(push_message::Body::GitLane(GitLaneTransfer {
2229            body: Some(body),
2230        })),
2231    }
2232}
2233
2234fn git_lane_transfer_size(transfer: &GitLaneTransfer) -> usize {
2235    match transfer.body.as_ref() {
2236        Some(git_lane_transfer::Body::Pack(pack)) => pack.pack_chunk.len(),
2237        Some(git_lane_transfer::Body::RefUpdate(update)) => update
2238            .target_oid
2239            .len()
2240            .saturating_add(update.peeled_oid.len())
2241            .saturating_add(update.expected_target_oid.len())
2242            .saturating_add(
2243                update
2244                    .checkpoint
2245                    .as_ref()
2246                    .map(git_checkpoint_transfer_size)
2247                    .unwrap_or_default(),
2248            ),
2249        Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2250            git_checkpoint_transfer_size(checkpoint)
2251        }
2252        None => 0,
2253    }
2254}
2255
2256fn git_checkpoint_transfer_size(checkpoint: &GitCheckpointTransfer) -> usize {
2257    checkpoint
2258        .heddle_change_id
2259        .len()
2260        .saturating_add(checkpoint.git_commit_oid.len())
2261        .saturating_add(checkpoint.thread.len())
2262        .saturating_add(checkpoint.metadata_json.len())
2263}
2264
2265#[derive(Default)]
2266struct GitPackPullInstallState {
2267    active: Option<GitPackPullInstall>,
2268}
2269
2270impl GitPackPullInstallState {
2271    fn ensure_idle(&self) -> Result<(), ProtocolError> {
2272        if self.active.is_none() {
2273            Ok(())
2274        } else {
2275            Err(ProtocolError::InvalidState(
2276                "Git pack transfer ended before final chunk".to_string(),
2277            ))
2278        }
2279    }
2280
2281    fn receive_chunk(
2282        &mut self,
2283        git_repo: &SleyRepository,
2284        pack: GitPackTransfer,
2285    ) -> Result<(), ProtocolError> {
2286        if pack.transfer_id.is_empty() {
2287            return Err(ProtocolError::InvalidState(
2288                "Git pack transfer_id is required".to_string(),
2289            ));
2290        }
2291        if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
2292            return Err(ProtocolError::InvalidState(format!(
2293                "Git pack exceeds maximum transfer size of {} bytes",
2294                wire::MAX_RECEIVED_GIT_PACK_SIZE
2295            )));
2296        }
2297        if pack.pack_chunk.is_empty() {
2298            return Err(ProtocolError::InvalidState(
2299                "Git pack chunk must not be empty".to_string(),
2300            ));
2301        }
2302        let pack_id =
2303            GitObjectId::from_raw(git_repo.object_format(), &pack.pack_id).map_err(|err| {
2304                ProtocolError::InvalidState(format!("GitPackTransfer.pack_id: {err}"))
2305            })?;
2306        if self.active.is_none() {
2307            if pack.offset != 0 {
2308                return Err(ProtocolError::InvalidState(format!(
2309                    "Git pack offset mismatch: expected 0, got {}",
2310                    pack.offset
2311                )));
2312            }
2313            if pack.chunk_index != 0 {
2314                return Err(ProtocolError::InvalidState(format!(
2315                    "Git pack chunk index mismatch: expected 0, got {}",
2316                    pack.chunk_index
2317                )));
2318            }
2319            let writer = git_repo
2320                .objects()
2321                .begin_raw_pack_install(pack_id, pack.pack_size)
2322                .map_err(|err| {
2323                    ProtocolError::InvalidState(format!("begin Git pack install: {err}"))
2324                })?;
2325            self.active = Some(GitPackPullInstall {
2326                transfer_id: pack.transfer_id.clone(),
2327                pack_id,
2328                pack_size: pack.pack_size,
2329                next_offset: 0,
2330                next_chunk_index: 0,
2331                writer,
2332            });
2333        }
2334
2335        let active = self.active.as_mut().ok_or_else(|| {
2336            ProtocolError::InvalidState("Git pack install not active".to_string())
2337        })?;
2338        active.receive_chunk(&pack, pack_id)?;
2339        if pack.is_final_chunk {
2340            let active = self.active.take().ok_or_else(|| {
2341                ProtocolError::InvalidState("Git pack install not active".to_string())
2342            })?;
2343            active.finish()?;
2344            git_repo.refresh_objects();
2345        }
2346        Ok(())
2347    }
2348}
2349
2350struct GitPackPullInstall {
2351    transfer_id: String,
2352    pack_id: GitObjectId,
2353    pack_size: u64,
2354    next_offset: u64,
2355    next_chunk_index: u32,
2356    writer: sley::plumbing::sley_odb::RawPackStreamingInstall,
2357}
2358
2359impl GitPackPullInstall {
2360    fn receive_chunk(
2361        &mut self,
2362        pack: &GitPackTransfer,
2363        pack_id: GitObjectId,
2364    ) -> Result<(), ProtocolError> {
2365        if self.transfer_id != pack.transfer_id {
2366            return Err(ProtocolError::InvalidState(format!(
2367                "Git pack transfer id changed from {:?} to {:?}",
2368                self.transfer_id, pack.transfer_id
2369            )));
2370        }
2371        if self.pack_id != pack_id {
2372            return Err(ProtocolError::InvalidState(
2373                "Git pack id changed during transfer".to_string(),
2374            ));
2375        }
2376        if self.pack_size != pack.pack_size {
2377            return Err(ProtocolError::InvalidState(
2378                "Git pack size changed during transfer".to_string(),
2379            ));
2380        }
2381        if pack.offset != self.next_offset {
2382            return Err(ProtocolError::InvalidState(format!(
2383                "Git pack offset mismatch: expected {}, got {}",
2384                self.next_offset, pack.offset
2385            )));
2386        }
2387        if pack.chunk_index != self.next_chunk_index {
2388            return Err(ProtocolError::InvalidState(format!(
2389                "Git pack chunk index mismatch: expected {}, got {}",
2390                self.next_chunk_index, pack.chunk_index
2391            )));
2392        }
2393        let chunk_len = u64::try_from(pack.pack_chunk.len()).map_err(|_| {
2394            ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
2395        })?;
2396        let next_offset = self
2397            .next_offset
2398            .checked_add(chunk_len)
2399            .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
2400        if next_offset > self.pack_size {
2401            return Err(ProtocolError::InvalidState(
2402                "Git pack chunk exceeds declared pack size".to_string(),
2403            ));
2404        }
2405        self.writer.write_all(&pack.pack_chunk).map_err(|err| {
2406            ProtocolError::InvalidState(format!("write streamed Git pack chunk: {err}"))
2407        })?;
2408        self.next_offset = next_offset;
2409        self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
2410            ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
2411        })?;
2412        if pack.is_final_chunk {
2413            if self.next_offset != self.pack_size {
2414                return Err(ProtocolError::InvalidState(format!(
2415                    "Git pack final size mismatch: declared {}, received {}",
2416                    self.pack_size, self.next_offset
2417                )));
2418            }
2419        } else if self.next_offset == self.pack_size {
2420            return Err(ProtocolError::InvalidState(
2421                "Git pack reached declared size without final chunk marker".to_string(),
2422            ));
2423        }
2424        Ok(())
2425    }
2426
2427    fn finish(self) -> Result<(), ProtocolError> {
2428        self.writer
2429            .finish()
2430            .map_err(|err| ProtocolError::InvalidState(format!("install Git pack: {err}")))?;
2431        Ok(())
2432    }
2433}
2434
2435fn accept_git_lane_pull_transfer(
2436    repo: &Repository,
2437    git_repo: &mut Option<SleyRepository>,
2438    git_pack_state: &mut GitPackPullInstallState,
2439    transfer: GitLaneTransfer,
2440) -> Result<(), ProtocolError> {
2441    if repo.capability() != RepositoryCapability::GitOverlay {
2442        return Err(ProtocolError::InvalidState(format!(
2443            "received git-lane pull transfer for non-GitOverlay repository (capability {:?})",
2444            repo.capability()
2445        )));
2446    }
2447    match transfer.body {
2448        Some(git_lane_transfer::Body::Pack(pack)) => {
2449            accept_git_lane_pack(repo, git_repo, git_pack_state, pack)
2450        }
2451        Some(git_lane_transfer::Body::RefUpdate(update)) => {
2452            accept_git_lane_ref_update(repo, git_repo, update)
2453        }
2454        Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2455            record_git_lane_checkpoint(repo, git_lane_sley_repository(repo, git_repo)?, checkpoint)
2456        }
2457        None => Err(ProtocolError::InvalidState(
2458            "GitLaneTransfer body is required".to_string(),
2459        )),
2460    }
2461}
2462
2463fn accept_git_lane_pack(
2464    repo: &Repository,
2465    git_repo: &mut Option<SleyRepository>,
2466    git_pack_state: &mut GitPackPullInstallState,
2467    pack: GitPackTransfer,
2468) -> Result<(), ProtocolError> {
2469    let git_repo = git_lane_sley_repository(repo, git_repo)?;
2470    git_pack_state.receive_chunk(git_repo, pack)?;
2471    Ok(())
2472}
2473
2474/// Apply a server-originated Git ref update on the pull stream.
2475///
2476/// Pull-side ref application is unconditional: we commit the local Git ref with
2477/// [`RefPrecondition::Any`] and do not compare against a prior target oid. That
2478/// is deliberate and **not** symmetric with push-side compare-and-set, where the
2479/// client transmits `expected_target_oid` / `expected_missing` from `ListRefs`.
2480/// The pull stream is single-threaded and server-trusted — the client applies
2481/// ref updates in the order the server sends them after installing the
2482/// accompanying pack, so there is no concurrent local writer racing this path.
2483fn accept_git_lane_ref_update(
2484    repo: &Repository,
2485    git_repo: &mut Option<SleyRepository>,
2486    update: GitRefUpdateTransfer,
2487) -> Result<(), ProtocolError> {
2488    let git_repo = git_lane_sley_repository(repo, git_repo)?;
2489    let target = git_oid_from_bytes(
2490        git_repo,
2491        "GitRefUpdateTransfer.target_oid",
2492        &update.target_oid,
2493    )?;
2494    git_repo.read_commit(&target).map_err(|err| {
2495        ProtocolError::InvalidState(format!(
2496            "Git ref {} target commit {} is not present after pack receive: {err}",
2497            update.name,
2498            target.to_hex()
2499        ))
2500    })?;
2501    let refs = git_repo.references();
2502    let mut tx = refs.transaction();
2503    tx.update_to(
2504        update.name.clone(),
2505        ReferenceTarget::Direct(target),
2506        RefPrecondition::Any,
2507        None,
2508    );
2509    tx.commit().map_err(|err| {
2510        ProtocolError::InvalidState(format!("update Git ref {}: {err}", update.name))
2511    })?;
2512    if let Some(checkpoint) = update.checkpoint {
2513        record_git_lane_checkpoint(repo, git_repo, checkpoint)?;
2514    }
2515    Ok(())
2516}
2517
2518fn record_git_lane_checkpoint(
2519    repo: &Repository,
2520    git_repo: &SleyRepository,
2521    checkpoint: GitCheckpointTransfer,
2522) -> Result<(), ProtocolError> {
2523    let state = ChangeId::try_from_slice(&checkpoint.heddle_change_id)
2524        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2525    let commit_oid = git_oid_from_bytes(
2526        git_repo,
2527        "GitCheckpointTransfer.git_commit_oid",
2528        &checkpoint.git_commit_oid,
2529    )?;
2530    let commit_hex = commit_oid.to_hex();
2531    if repo
2532        .latest_git_checkpoint_for_change(&state)
2533        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2534        .is_some_and(|record| record.git_commit == commit_hex)
2535    {
2536        return Ok(());
2537    }
2538    let summary = serde_json::from_str::<serde_json::Value>(&checkpoint.metadata_json)
2539        .ok()
2540        .and_then(|metadata| {
2541            metadata
2542                .get("summary")
2543                .and_then(serde_json::Value::as_str)
2544                .map(str::to_string)
2545        })
2546        .unwrap_or_else(|| "pulled Git checkpoint".to_string());
2547    repo.record_git_checkpoint(&state, commit_hex, summary)
2548        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2549    Ok(())
2550}
2551
2552fn git_lane_sley_repository<'a>(
2553    repo: &Repository,
2554    git_repo: &'a mut Option<SleyRepository>,
2555) -> Result<&'a SleyRepository, ProtocolError> {
2556    if git_repo.is_none() {
2557        *git_repo = Some(
2558            repo.git_overlay_sley_repository()
2559                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2560                .ok_or_else(|| {
2561                    ProtocolError::InvalidState(
2562                        "git-overlay repository has no Git store".to_string(),
2563                    )
2564                })?,
2565        );
2566    }
2567    match git_repo.as_ref() {
2568        Some(git_repo) => Ok(git_repo),
2569        None => Err(ProtocolError::InvalidState(
2570            "git-overlay repository has no Git store".to_string(),
2571        )),
2572    }
2573}
2574
2575fn git_oid_from_bytes(
2576    git_repo: &SleyRepository,
2577    field: &str,
2578    bytes: &[u8],
2579) -> Result<GitObjectId, ProtocolError> {
2580    GitObjectId::from_raw(git_repo.object_format(), bytes)
2581        .map_err(|err| ProtocolError::InvalidState(format!("{field}: {err}")))
2582}
2583
2584async fn send_native_pack_streaming_messages(
2585    tx: &mpsc::Sender<PushMessage>,
2586    repo: &Repository,
2587    objects: &[ObjectInfo],
2588    transfer_id: &str,
2589    chunk_size: usize,
2590    transport: &super::helpers::HostedTransportPolicy,
2591    transport_mode: TransportMode,
2592) -> Result<(), ProtocolError> {
2593    let object_count = u64::try_from(objects.len()).map_err(|_| {
2594        ProtocolError::InvalidState("native pack object count exceeds u64".to_string())
2595    })?;
2596    let mut writer = wire::NativePackStreamingWriter::new_in(repo.heddle_dir(), object_count)?;
2597    let mut pack_reader = wire::GrowingPackChunkReader::open(writer.pack_path(), chunk_size)?;
2598    let (loaded_tx, mut loaded_rx) = mpsc::channel::<(
2599        usize,
2600        Result<wire::ObjectData, ProtocolError>,
2601    )>(NATIVE_PACK_OBJECT_PREFETCH_LIMIT);
2602    let store = repo.store().clone();
2603    let object_plan = objects.to_vec();
2604    let loader = tokio::task::spawn_blocking(move || {
2605        load_native_pack_objects_parallel(store, object_plan, loaded_tx);
2606    });
2607
2608    let mut next_index = 0usize;
2609    let mut pending = BTreeMap::new();
2610    while next_index < objects.len() {
2611        let (index, object) = loaded_rx.recv().await.ok_or_else(|| {
2612            ProtocolError::InvalidState(
2613                "native pack object loader stopped before sending all objects".to_string(),
2614            )
2615        })?;
2616        pending.insert(index, object);
2617
2618        while let Some(object) = pending.remove(&next_index) {
2619            let object = object?;
2620            let should_drain = object.data.len() >= chunk_size
2621                || (next_index + 1).is_multiple_of(NATIVE_PACK_DRAIN_OBJECT_INTERVAL);
2622            writer.add_object_data(object)?;
2623            if should_drain {
2624                writer.flush_pack()?;
2625                drain_growing_native_pack_stream(
2626                    tx,
2627                    &mut pack_reader,
2628                    false,
2629                    PackStreamKind::Pack,
2630                    transfer_id,
2631                    transport,
2632                    transport_mode,
2633                )
2634                .await?;
2635            }
2636            next_index += 1;
2637        }
2638    }
2639    loader.await.map_err(|err| {
2640        ProtocolError::InvalidState(format!("native pack object loader task failed: {err}"))
2641    })?;
2642
2643    let bundle = writer.finish()?;
2644    drain_growing_native_pack_stream(
2645        tx,
2646        &mut pack_reader,
2647        true,
2648        PackStreamKind::Pack,
2649        transfer_id,
2650        transport,
2651        transport_mode,
2652    )
2653    .await?;
2654    send_native_pack_file_stream(
2655        tx,
2656        &bundle.index_path,
2657        PackStreamKind::Index,
2658        transfer_id,
2659        chunk_size,
2660        transport,
2661        transport_mode,
2662    )
2663    .await
2664}
2665
2666fn load_native_pack_objects_parallel(
2667    store: AnyStore,
2668    objects: Vec<ObjectInfo>,
2669    tx: mpsc::Sender<(usize, Result<wire::ObjectData, ProtocolError>)>,
2670) {
2671    if objects.is_empty() {
2672        return;
2673    }
2674    let worker_count = native_pack_object_load_worker_count(objects.len());
2675    let objects = Arc::new(objects);
2676    let next_index = Arc::new(AtomicUsize::new(0));
2677
2678    std::thread::scope(|scope| {
2679        for _ in 0..worker_count {
2680            let store = store.clone();
2681            let objects = Arc::clone(&objects);
2682            let next_index = Arc::clone(&next_index);
2683            let tx = tx.clone();
2684            scope.spawn(move || {
2685                loop {
2686                    let index = next_index.fetch_add(1, Ordering::Relaxed);
2687                    let Some(info) = objects.get(index) else {
2688                        break;
2689                    };
2690                    let object = wire::load_object_data(&store, &info.id, info.obj_type);
2691                    if tx.blocking_send((index, object)).is_err() {
2692                        break;
2693                    }
2694                }
2695            });
2696        }
2697    });
2698}
2699
2700fn native_pack_object_load_worker_count(object_count: usize) -> usize {
2701    let available = std::thread::available_parallelism()
2702        .map(usize::from)
2703        .unwrap_or(1);
2704    object_count
2705        .min(available)
2706        .clamp(1, NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT)
2707}
2708
2709async fn drain_growing_native_pack_stream(
2710    tx: &mpsc::Sender<PushMessage>,
2711    reader: &mut wire::GrowingPackChunkReader,
2712    final_stream: bool,
2713    stream_kind: PackStreamKind,
2714    transfer_id: &str,
2715    transport: &super::helpers::HostedTransportPolicy,
2716    transport_mode: TransportMode,
2717) -> Result<(), ProtocolError> {
2718    while let Some((offset, chunk_index, data, is_final_chunk)) =
2719        reader.next_available_chunk(final_stream)?
2720    {
2721        send_pack_chunk(
2722            tx,
2723            stream_kind,
2724            data,
2725            transfer_id,
2726            transport,
2727            transport_mode,
2728            chunk_index,
2729            offset,
2730            is_final_chunk,
2731        )
2732        .await?;
2733    }
2734    Ok(())
2735}
2736
2737async fn send_native_pack_file_stream(
2738    tx: &mpsc::Sender<PushMessage>,
2739    path: &std::path::Path,
2740    stream_kind: PackStreamKind,
2741    transfer_id: &str,
2742    chunk_size: usize,
2743    transport: &super::helpers::HostedTransportPolicy,
2744    transport_mode: TransportMode,
2745) -> Result<(), ProtocolError> {
2746    let mut reader = wire::PackFileChunkReader::open(path, chunk_size)?;
2747    while let Some((offset, chunk_index, data, is_final_chunk)) = reader.next_chunk()? {
2748        send_pack_chunk(
2749            tx,
2750            stream_kind,
2751            data,
2752            transfer_id,
2753            transport,
2754            transport_mode,
2755            chunk_index,
2756            offset,
2757            is_final_chunk,
2758        )
2759        .await?;
2760    }
2761    Ok(())
2762}
2763
2764#[allow(clippy::too_many_arguments)]
2765async fn send_pack_chunk(
2766    tx: &mpsc::Sender<PushMessage>,
2767    stream_kind: PackStreamKind,
2768    data: Vec<u8>,
2769    transfer_id: &str,
2770    transport: &super::helpers::HostedTransportPolicy,
2771    transport_mode: TransportMode,
2772    chunk_index: u32,
2773    offset: u64,
2774    is_final_chunk: bool,
2775) -> Result<(), ProtocolError> {
2776    let chunk_length = data.len().min(u32::MAX as usize) as u32;
2777    tx.send(PushMessage {
2778        body: Some(push_message::Body::Pack(PackChunk {
2779            stream_kind: stream_kind as i32,
2780            data: data.into(),
2781            transfer: Some(transport.transfer_checkpoint_with_mode(
2782                transfer_id,
2783                transport_mode,
2784                chunk_index,
2785                offset,
2786                is_final_chunk,
2787            )),
2788            chunk_length,
2789            is_final_chunk,
2790        })),
2791    })
2792    .await
2793    .map_err(|_| ProtocolError::InvalidState("push stream closed unexpectedly".to_string()))
2794}
2795
2796fn preferred_transport_mode(
2797    transport: &super::helpers::HostedTransportPolicy,
2798    object_count: usize,
2799) -> TransportMode {
2800    let _ = transport;
2801    let _ = object_count;
2802    TransportMode::NativePack
2803}
2804
2805#[cfg(test)]
2806mod tests {
2807    use std::collections::HashSet;
2808
2809    use chrono::{TimeZone, Utc};
2810    use cli_shared::ClientConfig;
2811    use grpc::heddle::v1::{
2812        ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
2813        TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
2814        repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
2815    };
2816    use objects::{
2817        object::{
2818            Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
2819            StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
2820        },
2821        store::ObjectStore,
2822    };
2823    use tempfile::TempDir;
2824    use tonic::{Response, Status, transport::Server};
2825    use wire::{ObjectId, ObjectInfo};
2826
2827    use super::*;
2828    use crate::grpc_hosted::helpers::{descriptor_id_from_info, to_proto_object_info};
2829
2830    fn temp_repo() -> (TempDir, Repository) {
2831        let dir = TempDir::new().expect("tempdir");
2832        let repo = Repository::init_default(dir.path()).expect("init repo");
2833        (dir, repo)
2834    }
2835
2836    /// An unseeded repo with no thread heads — the shape `heddle clone`
2837    /// creates locally before its first pull (`Repository::init`, not
2838    /// `init_default`).
2839    fn temp_repo_unseeded() -> (TempDir, Repository) {
2840        let dir = TempDir::new().expect("tempdir");
2841        let repo = Repository::init(dir.path()).expect("init repo");
2842        (dir, repo)
2843    }
2844
2845    fn redaction_info(blob: ContentHash) -> ObjectInfo {
2846        ObjectInfo {
2847            id: ObjectId::Hash(blob),
2848            obj_type: ObjectType::Redaction,
2849            size: 0,
2850            delta_base: None,
2851        }
2852    }
2853
2854    fn state_info(state: ChangeId) -> ObjectInfo {
2855        ObjectInfo {
2856            id: ObjectId::ChangeId(state),
2857            obj_type: ObjectType::State,
2858            size: 0,
2859            delta_base: None,
2860        }
2861    }
2862
2863    fn state_visibility_info(state: ChangeId) -> ObjectInfo {
2864        ObjectInfo {
2865            id: ObjectId::ChangeId(state),
2866            obj_type: ObjectType::StateVisibility,
2867            size: 0,
2868            delta_base: None,
2869        }
2870    }
2871
2872    fn sample_blob() -> ContentHash {
2873        ContentHash::from_bytes([7u8; 32])
2874    }
2875
2876    #[test]
2877    fn descriptor_id_from_info_matches_proto_encode_path() {
2878        let infos = [
2879            redaction_info(sample_blob()),
2880            state_info(ChangeId::from_bytes([3u8; 16])),
2881            state_visibility_info(ChangeId::from_bytes([9u8; 16])),
2882        ];
2883        for info in infos {
2884            assert_eq!(
2885                descriptor_id_from_info(&info),
2886                descriptor_id(&to_proto_object_info(&info)),
2887                "keying path must stay byte-identical to the throwaway-encode path",
2888            );
2889        }
2890    }
2891
2892    #[test]
2893    fn git_lane_messages_pack_reachable_commit_graph_and_attach_checkpoint() {
2894        let dir = TempDir::new().expect("tempdir");
2895        let git = sley::Repository::init(dir.path()).expect("init git");
2896        let blob_oid = git.write_blob(b"hello\n").expect("write blob");
2897        let tree = sley::TreeObject {
2898            entries: vec![sley::plumbing::sley_object::TreeEntry {
2899                mode: 0o100644,
2900                name: sley::BString::from(b"hello.txt"),
2901                oid: blob_oid,
2902            }],
2903        };
2904        let tree_oid = git
2905            .write_raw_object(sley::GitObjectType::Tree, tree.write())
2906            .expect("write tree");
2907        let commit = sley::CommitObject {
2908            tree: tree_oid,
2909            parents: Vec::new(),
2910            author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2911            committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2912            encoding: None,
2913            message: b"checkpoint\n".to_vec(),
2914        };
2915        let commit_oid = git
2916            .write_raw_object(sley::GitObjectType::Commit, commit.write())
2917            .expect("write commit");
2918
2919        let pack = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64 * 1024)
2920            .expect("build git lane pack plan");
2921        assert_eq!(pack.pack_id.len(), git.object_format().raw_len());
2922        let (tx, mut rx) = mpsc::channel(8);
2923        stream_git_pack_messages_blocking(tx, pack.clone(), 64 * 1024, Progress::null())
2924            .expect("stream git lane pack");
2925        let mut pack_bytes = Vec::new();
2926        let mut chunks = Vec::new();
2927        while let Some(chunk) = rx.blocking_recv() {
2928            let Some(push_message::Body::GitLane(GitLaneTransfer {
2929                body: Some(git_lane_transfer::Body::Pack(pack)),
2930            })) = chunk.body
2931            else {
2932                panic!("expected git pack chunk");
2933            };
2934            pack_bytes.extend_from_slice(&pack.pack_chunk);
2935            chunks.push(pack);
2936        }
2937        assert!(!chunks.is_empty());
2938        assert!(chunks.last().expect("pack chunk").is_final_chunk);
2939        assert_eq!(pack_bytes.len() as u64, pack.pack_size);
2940        let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
2941            .expect("generated pack should index");
2942        assert_eq!(indexed.pack_id.as_bytes(), pack.pack_id.as_slice());
2943        assert_eq!(indexed.objects.len(), 3);
2944
2945        let state = ChangeId::from_bytes([9u8; 16]);
2946        let ref_message = git_ref_update_message(
2947            "refs/heads/main",
2948            GrpcGitRefKind::Branch,
2949            commit_oid,
2950            None,
2951            Some(GitCheckpointTransfer {
2952                heddle_change_id: state.as_bytes().to_vec().into(),
2953                git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2954                thread: "main".to_string(),
2955                metadata_json: String::new(),
2956            }),
2957        );
2958        let Some(push_message::Body::GitLane(GitLaneTransfer {
2959            body: Some(git_lane_transfer::Body::RefUpdate(update)),
2960        })) = ref_message.body
2961        else {
2962            panic!("expected git ref update message");
2963        };
2964        assert_eq!(update.name, "refs/heads/main");
2965        assert_eq!(update.kind, GrpcGitRefKind::Branch as i32);
2966        assert_eq!(update.target_oid.as_ref(), commit_oid.as_bytes());
2967        let checkpoint = update.checkpoint.expect("checkpoint");
2968        assert_eq!(checkpoint.heddle_change_id.as_ref(), state.as_bytes());
2969        assert_eq!(checkpoint.git_commit_oid.as_ref(), commit_oid.as_bytes());
2970        assert_eq!(checkpoint.thread, "main");
2971    }
2972
2973    fn sample_ref_update_message(commit_oid: GitObjectId) -> PushMessage {
2974        git_ref_update_message(
2975            "refs/heads/main",
2976            GrpcGitRefKind::Branch,
2977            commit_oid,
2978            None,
2979            Some(GitCheckpointTransfer {
2980                heddle_change_id: ChangeId::from_bytes([9u8; 16]).as_bytes().to_vec().into(),
2981                git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2982                thread: "main".to_string(),
2983                metadata_json: String::new(),
2984            }),
2985        )
2986    }
2987
2988    /// Write a distinct single-commit graph into `git` and return the commit
2989    /// oid. `seed` differentiates the tree content so each commit is unique.
2990    fn write_commit(git: &sley::Repository, seed: &str) -> GitObjectId {
2991        let blob_oid = git
2992            .write_blob(format!("content-{seed}\n").as_bytes())
2993            .expect("write blob");
2994        let tree = sley::TreeObject {
2995            entries: vec![sley::plumbing::sley_object::TreeEntry {
2996                mode: 0o100644,
2997                name: sley::BString::from(format!("{seed}.txt").into_bytes()),
2998                oid: blob_oid,
2999            }],
3000        };
3001        let tree_oid = git
3002            .write_raw_object(sley::GitObjectType::Tree, tree.write())
3003            .expect("write tree");
3004        let commit = sley::CommitObject {
3005            tree: tree_oid,
3006            parents: Vec::new(),
3007            author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
3008            committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
3009            encoding: None,
3010            message: format!("commit {seed}\n").into_bytes(),
3011        };
3012        git.write_raw_object(sley::GitObjectType::Commit, commit.write())
3013            .expect("write commit")
3014    }
3015
3016    fn set_ref(git: &mut sley::Repository, name: &str, target: GitObjectId) {
3017        let refs = git.references();
3018        let mut tx = refs.transaction();
3019        tx.update_to(
3020            name.to_string(),
3021            ReferenceTarget::Direct(target),
3022            RefPrecondition::Any,
3023            None,
3024        );
3025        tx.commit().expect("commit ref");
3026    }
3027
3028    fn mirror_ref_updates(plan: &GitLanePushPlan) -> Vec<GitRefUpdateTransfer> {
3029        plan.ref_updates
3030            .iter()
3031            .map(git_ref_update_from_message)
3032            .cloned()
3033            .collect()
3034    }
3035
3036    /// `ReachablePackPlan::prepare_to_file` must emit the same pack bytes as the
3037    /// legacy `write_reachable_pack_to_writer` path (wire checksum + size).
3038    #[test]
3039    fn git_lane_reachable_pack_plan_matches_legacy_writer_checksum() {
3040        let dir = TempDir::new().expect("tempdir");
3041        let git = sley::Repository::init(dir.path()).expect("init git");
3042        let commit_oid = write_commit(&git, "byte-identity");
3043
3044        let pack_plan = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64 * 1024)
3045            .expect("build pack plan");
3046
3047        let mut legacy_pack = Vec::new();
3048        let legacy = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
3049            git.objects().as_ref(),
3050            git.object_format(),
3051            std::iter::once(commit_oid),
3052            &HashSet::new(),
3053            &mut legacy_pack,
3054        )
3055        .expect("legacy reachable pack")
3056        .expect("legacy pack summary");
3057
3058        assert_eq!(pack_plan.pack_id, legacy.checksum.as_bytes().to_vec());
3059        assert_eq!(pack_plan.pack_size, legacy.pack_size);
3060        assert_eq!(pack_plan.pack_size, legacy_pack.len() as u64);
3061
3062        let mut planned_pack = Vec::new();
3063        pack_plan
3064            .pack_file
3065            .lock()
3066            .expect("lock pack tempfile")
3067            .seek(SeekFrom::Start(0))
3068            .expect("rewind pack tempfile");
3069        io::copy(
3070            &mut pack_plan
3071                .pack_file
3072                .lock()
3073                .expect("lock pack tempfile")
3074                .as_file_mut(),
3075            &mut planned_pack,
3076        )
3077        .expect("read planned pack");
3078        assert_eq!(planned_pack, legacy_pack);
3079    }
3080
3081    /// The mirror plan builder reads ALL direct refs (N branches + a tag),
3082    /// emits exactly N checkpoint-less ref updates with the correct kind, and
3083    /// builds ONE pack whose root set equals the resolved ref targets.
3084    #[test]
3085    fn git_mirror_plan_builds_one_pack_and_checkpointless_updates_for_all_refs() {
3086        let dir = TempDir::new().expect("tempdir");
3087        let mut git = sley::Repository::init(dir.path()).expect("init git");
3088
3089        let main_commit = write_commit(&git, "main");
3090        let feature_commit = write_commit(&git, "feature");
3091        let tagged_commit = write_commit(&git, "tagged");
3092
3093        set_ref(&mut git, "refs/heads/main", main_commit);
3094        set_ref(&mut git, "refs/heads/feature", feature_commit);
3095
3096        // Annotated tag pointing at `tagged_commit`.
3097        let tag = sley::TagObject {
3098            object: tagged_commit,
3099            object_type: sley::GitObjectType::Commit,
3100            name: b"v1".to_vec(),
3101            tagger: Some(b"Tester <test@example.com> 1700000000 +0000".to_vec()),
3102            message: b"release v1\n".to_vec(),
3103            raw_body: None,
3104        };
3105        let tag_oid = git
3106            .write_raw_object(sley::GitObjectType::Tag, tag.write())
3107            .expect("write tag");
3108        set_ref(&mut git, "refs/tags/v1", tag_oid);
3109
3110        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3111            .expect("build mirror plan");
3112
3113        let updates = mirror_ref_updates(&plan);
3114        assert_eq!(updates.len(), 3, "one ref update per direct ref");
3115
3116        // Every mirror-mode ref update MUST have `checkpoint: None` — the
3117        // discriminator the weft server keys on (plan §B.4).
3118        assert!(
3119            updates.iter().all(|u| u.checkpoint.is_none()),
3120            "mirror mode signals via checkpoint: None on every ref update",
3121        );
3122
3123        let by_name: HashMap<&str, &GitRefUpdateTransfer> =
3124            updates.iter().map(|u| (u.name.as_str(), u)).collect();
3125
3126        let main = by_name["refs/heads/main"];
3127        assert_eq!(main.kind, GrpcGitRefKind::Branch as i32);
3128        assert_eq!(main.target_oid.as_ref(), main_commit.as_bytes());
3129        assert!(main.peeled_oid.is_empty(), "commit refs are not peeled");
3130
3131        let feature = by_name["refs/heads/feature"];
3132        assert_eq!(feature.kind, GrpcGitRefKind::Branch as i32);
3133        assert_eq!(feature.target_oid.as_ref(), feature_commit.as_bytes());
3134
3135        let tag_update = by_name["refs/tags/v1"];
3136        assert_eq!(tag_update.kind, GrpcGitRefKind::Tag as i32);
3137        assert_eq!(tag_update.target_oid.as_ref(), tag_oid.as_bytes());
3138        assert_eq!(
3139            tag_update.peeled_oid.as_ref(),
3140            tagged_commit.as_bytes(),
3141            "annotated tag ref is peeled to its underlying commit",
3142        );
3143
3144        // ONE pack, whose root set equals the resolved ref targets.
3145        let mut pack_roots: Vec<GitObjectId> = plan.pack.roots.clone();
3146        pack_roots.sort_by_key(|oid| oid.to_hex());
3147        let mut expected_roots = vec![main_commit, feature_commit, tag_oid];
3148        expected_roots.sort_by_key(|oid| oid.to_hex());
3149        assert_eq!(
3150            pack_roots, expected_roots,
3151            "pack roots == resolved ref targets",
3152        );
3153
3154        // The single pack must actually contain the whole closure: main,
3155        // feature, tag object + tagged commit, plus their trees/blobs.
3156        let (tx, mut rx) = mpsc::channel(64);
3157        stream_git_pack_messages_blocking(tx, plan.pack.clone(), 64 * 1024, Progress::null())
3158            .expect("stream mirror pack");
3159        let mut pack_bytes = Vec::new();
3160        while let Some(message) = rx.blocking_recv() {
3161            if let Some(push_message::Body::GitLane(GitLaneTransfer {
3162                body: Some(git_lane_transfer::Body::Pack(pack)),
3163            })) = message.body
3164            {
3165                pack_bytes.extend_from_slice(&pack.pack_chunk);
3166            }
3167        }
3168        assert_eq!(
3169            pack_bytes.len() as u64,
3170            plan.pack.pack_size,
3171            "streamed pack size must match plan",
3172        );
3173        let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
3174            .expect("mirror pack indexes");
3175        let packed: HashSet<Vec<u8>> = indexed
3176            .objects
3177            .iter()
3178            .map(|obj| obj.oid.as_bytes().to_vec())
3179            .collect();
3180        for oid in [main_commit, feature_commit, tagged_commit, tag_oid] {
3181            assert!(
3182                packed.contains(oid.as_bytes()),
3183                "pack must contain {}",
3184                oid.to_hex()
3185            );
3186        }
3187    }
3188
3189    /// Symbolic refs (e.g. HEAD) are not emitted as ref updates — only their
3190    /// direct target ref is pushed.
3191    #[test]
3192    fn git_mirror_plan_skips_symbolic_refs() {
3193        let dir = TempDir::new().expect("tempdir");
3194        let mut git = sley::Repository::init(dir.path()).expect("init git");
3195        let main_commit = write_commit(&git, "main");
3196        set_ref(&mut git, "refs/heads/main", main_commit);
3197        // HEAD is symbolic → refs/heads/main; it must not become its own
3198        // ref update.
3199        {
3200            let refs = git.references();
3201            let mut tx = refs.transaction();
3202            tx.update_to(
3203                "HEAD".to_string(),
3204                ReferenceTarget::Symbolic("refs/heads/main".to_string()),
3205                RefPrecondition::Any,
3206                None,
3207            );
3208            tx.commit().expect("set HEAD");
3209        }
3210
3211        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3212            .expect("build mirror plan");
3213        let updates = mirror_ref_updates(&plan);
3214        assert_eq!(updates.len(), 1, "only the direct ref is pushed");
3215        assert_eq!(updates[0].name, "refs/heads/main");
3216    }
3217
3218    /// Per-ref remote expectations from `ListRefs` are applied to each ref
3219    /// update (compare-and-set); unlisted refs default to expected-missing.
3220    #[test]
3221    fn git_mirror_plan_applies_per_ref_remote_expectations() {
3222        let dir = TempDir::new().expect("tempdir");
3223        let mut git = sley::Repository::init(dir.path()).expect("init git");
3224        let main_commit = write_commit(&git, "main");
3225        let feature_commit = write_commit(&git, "feature");
3226        set_ref(&mut git, "refs/heads/main", main_commit);
3227        set_ref(&mut git, "refs/heads/feature", feature_commit);
3228
3229        let remote_oid = "89abcdef012345670123456789abcdef01234567";
3230        let mut expectations = HashMap::new();
3231        expectations.insert(
3232            "refs/heads/main".to_string(),
3233            GitRefRemoteExpectation::Value(hex::decode(remote_oid).expect("hex")),
3234        );
3235        // refs/heads/feature intentionally absent → expected-missing.
3236
3237        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &expectations)
3238            .expect("build mirror plan");
3239        let updates = mirror_ref_updates(&plan);
3240        let by_name: HashMap<&str, &GitRefUpdateTransfer> =
3241            updates.iter().map(|u| (u.name.as_str(), u)).collect();
3242
3243        let main = by_name["refs/heads/main"];
3244        assert!(!main.expected_missing);
3245        assert_eq!(hex::encode(main.expected_target_oid.as_ref()), remote_oid);
3246
3247        let feature = by_name["refs/heads/feature"];
3248        assert!(
3249            feature.expected_missing,
3250            "ref absent from ListRefs is expected-missing (create)",
3251        );
3252    }
3253
3254    /// GitHub-style `refs/pull/*` ref names must pass through the mirror plan
3255    /// unchanged: classified as `Other` (not branch/tag/note), packed with the
3256    /// correct target, and checkpoint-less. Confirms the #846 ref-name caveat.
3257    #[test]
3258    fn git_mirror_plan_includes_pull_request_refs() {
3259        let dir = TempDir::new().expect("tempdir");
3260        let mut git = sley::Repository::init(dir.path()).expect("init git");
3261        let pr_commit = write_commit(&git, "pr");
3262        set_ref(&mut git, "refs/pull/42/head", pr_commit);
3263
3264        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3265            .expect("build mirror plan");
3266        let updates = mirror_ref_updates(&plan);
3267        let pull = updates
3268            .iter()
3269            .find(|update| update.name == "refs/pull/42/head")
3270            .expect("refs/pull/* ref must be mirrored");
3271        assert_eq!(
3272            pull.kind,
3273            GrpcGitRefKind::Other as i32,
3274            "refs/pull/* classifies as Other",
3275        );
3276        assert_eq!(pull.target_oid.as_ref(), pr_commit.as_bytes());
3277        assert!(
3278            pull.checkpoint.is_none(),
3279            "mirror ref updates are checkpoint-less",
3280        );
3281        assert!(pull.peeled_oid.is_empty(), "a commit ref is not peeled");
3282    }
3283
3284    /// The default mirror push must ship CONTENT refs (heads, tags, and
3285    /// heddle's `refs/notes/heddle` state metadata) but EXCLUDE local-only
3286    /// bookkeeping namespaces (#846): `refs/stash`, `refs/remotes/*`,
3287    /// `refs/original/*`, `refs/replace/*`.
3288    #[test]
3289    fn git_mirror_plan_excludes_local_only_refs_and_keeps_content() {
3290        let dir = TempDir::new().expect("tempdir");
3291        let mut git = sley::Repository::init(dir.path()).expect("init git");
3292
3293        let main_commit = write_commit(&git, "main");
3294        let notes_commit = write_commit(&git, "notes");
3295        let stash_commit = write_commit(&git, "stash");
3296        let remote_commit = write_commit(&git, "remote");
3297        let original_commit = write_commit(&git, "original");
3298        let replace_commit = write_commit(&git, "replace");
3299
3300        // Content refs — these MUST be mirrored.
3301        set_ref(&mut git, "refs/heads/main", main_commit);
3302        set_ref(&mut git, "refs/notes/heddle", notes_commit);
3303
3304        // Local-only bookkeeping — these MUST be excluded.
3305        set_ref(&mut git, "refs/stash", stash_commit);
3306        set_ref(&mut git, "refs/remotes/origin/main", remote_commit);
3307        set_ref(&mut git, "refs/original/refs/heads/main", original_commit);
3308        set_ref(&mut git, "refs/replace/deadbeef", replace_commit);
3309
3310        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3311            .expect("build mirror plan");
3312        let names: Vec<&str> = plan
3313            .ref_updates
3314            .iter()
3315            .map(|m| git_ref_update_from_message(m).name.as_str())
3316            .collect();
3317
3318        assert!(
3319            names.contains(&"refs/heads/main"),
3320            "content branch is mirrored: {names:?}",
3321        );
3322        assert!(
3323            names.contains(&"refs/notes/heddle"),
3324            "heddle state-note ref is mirrored: {names:?}",
3325        );
3326        for excluded in [
3327            "refs/stash",
3328            "refs/remotes/origin/main",
3329            "refs/original/refs/heads/main",
3330            "refs/replace/deadbeef",
3331        ] {
3332            assert!(
3333                !names.contains(&excluded),
3334                "local-only ref {excluded} must NOT be mirrored: {names:?}",
3335            );
3336        }
3337        assert_eq!(names.len(), 2, "exactly the two content refs ship: {names:?}");
3338    }
3339
3340    /// A dangling ref in an EXCLUDED namespace (e.g. a stale
3341    /// `refs/original/*` filter-branch backup whose target object is gone)
3342    /// must not fail the whole push — it is filtered before the readability
3343    /// check. Content refs still ship.
3344    #[test]
3345    fn git_mirror_plan_ignores_dangling_local_only_ref() {
3346        let dir = TempDir::new().expect("tempdir");
3347        let mut git = sley::Repository::init(dir.path()).expect("init git");
3348
3349        let main_commit = write_commit(&git, "main");
3350        set_ref(&mut git, "refs/heads/main", main_commit);
3351
3352        // Point a local-only ref at a non-existent object oid.
3353        let missing = GitObjectId::from_hex(
3354            sley::ObjectFormat::Sha1,
3355            "0123456789abcdef0123456789abcdef01234567",
3356        )
3357        .expect("oid");
3358        set_ref(&mut git, "refs/original/refs/heads/main", missing);
3359
3360        let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3361            .expect("dangling local-only ref must not fail the mirror plan");
3362        let names: Vec<&str> = plan
3363            .ref_updates
3364            .iter()
3365            .map(|m| git_ref_update_from_message(m).name.as_str())
3366            .collect();
3367        assert_eq!(names, vec!["refs/heads/main"], "only content ships: {names:?}");
3368    }
3369
3370    #[test]
3371    fn git_ref_expectation_marks_missing_when_remote_has_no_git_revision() {
3372        let commit_oid = GitObjectId::from_hex(
3373            sley::ObjectFormat::Sha1,
3374            "0123456789abcdef0123456789abcdef01234567",
3375        )
3376        .expect("oid");
3377        let mut message = sample_ref_update_message(commit_oid);
3378
3379        let expectation = parse_git_ref_expectation("").expect("missing expectation");
3380        apply_git_ref_expectation_value(&mut message, &expectation).expect("apply expectation");
3381        let update = git_ref_update_from_message(&message);
3382        assert!(update.expected_missing);
3383        assert!(update.expected_target_oid.is_empty());
3384    }
3385
3386    #[test]
3387    fn git_ref_expectation_uses_remote_git_revision_oid() {
3388        let commit_oid = GitObjectId::from_hex(
3389            sley::ObjectFormat::Sha1,
3390            "0123456789abcdef0123456789abcdef01234567",
3391        )
3392        .expect("oid");
3393        let remote_oid = "89abcdef012345670123456789abcdef01234567";
3394        let mut message = sample_ref_update_message(commit_oid);
3395
3396        let expectation =
3397            parse_git_ref_expectation(&format!("git:{remote_oid}")).expect("git expectation");
3398        apply_git_ref_expectation_value(&mut message, &expectation).expect("apply expectation");
3399        let update = git_ref_update_from_message(&message);
3400        assert!(!update.expected_missing);
3401        assert_eq!(hex::encode(update.expected_target_oid.as_ref()), remote_oid);
3402    }
3403
3404    #[test]
3405    fn git_pack_stream_writer_emits_ordered_chunks() {
3406        let (tx, mut rx) = mpsc::channel(4);
3407        let mut writer = GitPackPushMessageWriter::new(
3408            tx,
3409            "git-pack:test".to_string(),
3410            vec![0x42; 20],
3411            10,
3412            4,
3413            Progress::null(),
3414        );
3415        writer.write_all(b"abcdefghij").expect("write pack bytes");
3416        writer.finish().expect("finish pack stream");
3417
3418        let mut chunks = Vec::new();
3419        while let Some(message) = rx.blocking_recv() {
3420            let Some(push_message::Body::GitLane(GitLaneTransfer {
3421                body: Some(git_lane_transfer::Body::Pack(pack)),
3422            })) = message.body
3423            else {
3424                panic!("expected git pack chunk");
3425            };
3426            chunks.push(pack);
3427        }
3428
3429        assert_eq!(chunks.len(), 3);
3430        assert_eq!(chunks[0].offset, 0);
3431        assert_eq!(chunks[0].chunk_index, 0);
3432        assert!(!chunks[0].is_final_chunk);
3433        assert_eq!(chunks[0].pack_chunk.as_ref(), b"abcd");
3434        assert_eq!(chunks[1].offset, 4);
3435        assert_eq!(chunks[1].chunk_index, 1);
3436        assert!(!chunks[1].is_final_chunk);
3437        assert_eq!(chunks[1].pack_chunk.as_ref(), b"efgh");
3438        assert_eq!(chunks[2].offset, 8);
3439        assert_eq!(chunks[2].chunk_index, 2);
3440        assert!(chunks[2].is_final_chunk);
3441        assert_eq!(chunks[2].pack_chunk.as_ref(), b"ij");
3442        assert!(
3443            chunks
3444                .iter()
3445                .all(|chunk| chunk.pack_id.as_ref() == &[0x42; 20][..])
3446        );
3447    }
3448
3449    /// A `Sink` that records the phase label of every rendered snapshot, so a
3450    /// test can assert on the human progress line the push seam drives.
3451    #[derive(Default)]
3452    struct PhaseCapturingSink {
3453        phases: std::sync::Mutex<Vec<String>>,
3454    }
3455
3456    impl PhaseCapturingSink {
3457        fn phases(&self) -> Vec<String> {
3458            self.phases.lock().unwrap().clone()
3459        }
3460    }
3461
3462    impl objects::Sink for PhaseCapturingSink {
3463        fn render(&self, snap: objects::ProgressSnapshot) {
3464            self.phases.lock().unwrap().push(snap.phase);
3465        }
3466    }
3467
3468    /// Streaming the Git pack must drive the generic progress substrate with a
3469    /// live "uploading N/M bytes" phase, ending at the full pack size. This is
3470    /// the DoD "live progress line" for the default git-overlay mirror push.
3471    #[test]
3472    fn git_pack_stream_reports_upload_progress() {
3473        let dir = TempDir::new().expect("tempdir");
3474        let git = sley::Repository::init(dir.path()).expect("init git");
3475        let commit_oid = write_commit(&git, "progress");
3476        // Small chunk size so the pack streams over several chunks.
3477        let pack = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64)
3478            .expect("build pack plan");
3479
3480        let sink = std::sync::Arc::new(PhaseCapturingSink::default());
3481        struct Forward(std::sync::Arc<PhaseCapturingSink>);
3482        impl objects::Sink for Forward {
3483            fn render(&self, snap: objects::ProgressSnapshot) {
3484                self.0.render(snap);
3485            }
3486        }
3487        let progress = Progress::with_sink(Box::new(Forward(std::sync::Arc::clone(&sink))));
3488
3489        let (tx, mut rx) = mpsc::channel(1024);
3490        stream_git_pack_messages_blocking(tx, pack.clone(), 64, progress.clone())
3491            .expect("stream pack");
3492        while rx.blocking_recv().is_some() {}
3493
3494        let phases = sink.phases();
3495        assert!(
3496            phases.iter().any(|phase| phase.contains("uploading")),
3497            "pack streamer must drive an 'uploading' progress phase; saw {phases:?}",
3498        );
3499        let last_uploading = phases
3500            .iter()
3501            .rev()
3502            .find(|phase| phase.contains("uploading"))
3503            .cloned();
3504        assert!(
3505            last_uploading
3506                .as_deref()
3507                .is_some_and(|phase| phase.contains(&pack.pack_size.to_string())),
3508            "final uploading phase must show the full pack size ({} bytes); saw {last_uploading:?}",
3509            pack.pack_size,
3510        );
3511    }
3512
3513    #[test]
3514    fn git_pack_pull_install_state_streams_pack_into_sley_store() {
3515        let source_dir = TempDir::new().expect("source tempdir");
3516        let source = sley::Repository::init(source_dir.path()).expect("init source git");
3517        let blob_oid = source.write_blob(b"streamed pull pack\n").expect("blob");
3518        let pack = sley::plumbing::sley_odb::build_reachable_pack(
3519            source.objects().as_ref(),
3520            source.object_format(),
3521            [blob_oid],
3522            &HashSet::new(),
3523        )
3524        .expect("build pack")
3525        .expect("reachable pack");
3526
3527        let dest_dir = TempDir::new().expect("dest tempdir");
3528        let dest = sley::Repository::init(dest_dir.path()).expect("init dest git");
3529        let mut state = GitPackPullInstallState::default();
3530        let chunk_size = 7usize;
3531        let mut offset = 0u64;
3532        for (chunk_index, chunk) in pack.pack.chunks(chunk_size).enumerate() {
3533            let next_offset = offset + chunk.len() as u64;
3534            state
3535                .receive_chunk(
3536                    &dest,
3537                    GitPackTransfer {
3538                        transfer_id: "git-pack:test".to_string(),
3539                        offset,
3540                        chunk_index: chunk_index as u32,
3541                        is_final_chunk: next_offset == pack.pack.len() as u64,
3542                        pack_size: pack.pack.len() as u64,
3543                        pack_chunk: chunk.to_vec().into(),
3544                        pack_id: pack.checksum.as_bytes().to_vec().into(),
3545                    },
3546                )
3547                .expect("receive chunk");
3548            offset = next_offset;
3549        }
3550
3551        state.ensure_idle().expect("stream should finish");
3552        let object = dest.read_object(&blob_oid).expect("read installed object");
3553        assert_eq!(object.body.as_slice(), b"streamed pull pack\n");
3554    }
3555
3556    #[test]
3557    fn accept_git_lane_pull_transfer_errors_for_non_overlay_repo() {
3558        let (_dir, repo) = temp_repo();
3559        assert_ne!(
3560            repo.capability(),
3561            RepositoryCapability::GitOverlay,
3562            "init_default repo must be non-GitOverlay for this guard test",
3563        );
3564        let mut git_repo = None;
3565        let mut state = GitPackPullInstallState::default();
3566        let transfer = GitLaneTransfer {
3567            body: Some(git_lane_transfer::Body::Pack(GitPackTransfer {
3568                transfer_id: "git-pack:test".to_string(),
3569                offset: 0,
3570                chunk_index: 0,
3571                is_final_chunk: true,
3572                pack_size: 0,
3573                pack_chunk: Vec::new().into(),
3574                pack_id: Vec::new().into(),
3575            })),
3576        };
3577        let err = accept_git_lane_pull_transfer(&repo, &mut git_repo, &mut state, transfer)
3578            .expect_err("git-lane pull to a non-GitOverlay repo must fail loud, not silently drop");
3579        assert!(
3580            matches!(err, ProtocolError::InvalidState(_)),
3581            "expected InvalidState protocol error, got {err:?}",
3582        );
3583    }
3584
3585    fn git_ref_update_from_message(message: &PushMessage) -> &GitRefUpdateTransfer {
3586        let Some(push_message::Body::GitLane(GitLaneTransfer {
3587            body: Some(git_lane_transfer::Body::RefUpdate(update)),
3588        })) = message.body.as_ref()
3589        else {
3590            panic!("expected git ref update message");
3591        };
3592        update
3593    }
3594
3595    fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
3596        let hex = hash.to_hex();
3597        let (prefix, rest) = hex.split_at(2);
3598        repo.heddle_dir()
3599            .join("objects")
3600            .join("trees")
3601            .join(prefix)
3602            .join(rest)
3603    }
3604
3605    fn sample_redaction(blob: ContentHash) -> Redaction {
3606        Redaction {
3607            redacted_blob: blob,
3608            state: ChangeId::from_bytes([1u8; 16]),
3609            path: "config/secrets.toml".into(),
3610            reason: "leaked credential".into(),
3611            redactor: Principal {
3612                name: "Grace Hopper".into(),
3613                email: "grace@example.com".into(),
3614            },
3615            redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
3616            signature: None,
3617            purged_at: None,
3618            supersedes: None,
3619        }
3620    }
3621
3622    fn sample_state_visibility(state: ChangeId) -> StateVisibility {
3623        StateVisibility {
3624            state,
3625            tier: VisibilityTier::Restricted {
3626                scope_label: "security-embargo".into(),
3627            },
3628            embargo_until: None,
3629            declarer: Principal {
3630                name: "Grace Hopper".into(),
3631                email: "grace@example.com".into(),
3632            },
3633            declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
3634            signature: None,
3635            supersedes: None,
3636        }
3637    }
3638
3639    #[test]
3640    fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
3641        for obj_type in wire::native_pack_excluded_object_types() {
3642            assert!(
3643                wire::TransferPartitions::<ObjectInfo>::is_sidecar_object_type(*obj_type),
3644                "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
3645            );
3646        }
3647    }
3648
3649    #[test]
3650    fn native_pack_required_tracks_packable_pull_wants() {
3651        let blob = sample_blob();
3652        let state = ChangeId::from_bytes([9u8; 16]);
3653
3654        let sidecar_only = RepositoryTransferPlan::from_object_infos(
3655            vec![state_visibility_info(state)],
3656            GitLaneTransferIntent::HeddleObjectsOnly,
3657        );
3658        assert!(!native_pack_required_for_pull(false, &sidecar_only));
3659
3660        let redaction_only = RepositoryTransferPlan::from_object_infos(
3661            vec![redaction_info(blob)],
3662            GitLaneTransferIntent::HeddleObjectsOnly,
3663        );
3664        assert!(!native_pack_required_for_pull(false, &redaction_only));
3665
3666        let packable = RepositoryTransferPlan::from_object_infos(
3667            vec![ObjectInfo {
3668                id: ObjectId::Hash(blob),
3669                obj_type: ObjectType::Blob,
3670                size: 0,
3671                delta_base: None,
3672            }],
3673            GitLaneTransferIntent::HeddleObjectsOnly,
3674        );
3675        assert!(native_pack_required_for_pull(false, &packable));
3676
3677        let state_with_sidecar = RepositoryTransferPlan::from_object_infos(
3678            vec![state_info(state), state_visibility_info(state)],
3679            GitLaneTransferIntent::HeddleObjectsOnly,
3680        );
3681        assert!(native_pack_required_for_pull(false, &state_with_sidecar));
3682        let empty = RepositoryTransferPlan::from_object_infos(
3683            Vec::<ObjectInfo>::new(),
3684            GitLaneTransferIntent::HeddleObjectsOnly,
3685        );
3686        assert!(native_pack_required_for_pull(true, &empty));
3687    }
3688
3689    #[test]
3690    fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
3691        let (_dir, repo) = temp_repo();
3692        let state = ChangeId::from_bytes([9u8; 16]);
3693        let plan = plan_pull_wants(
3694            &repo,
3695            &state,
3696            false,
3697            vec![
3698                object_descriptor_with_status(
3699                    &state_info(state),
3700                    ObjectAvailabilityStatus::Missing,
3701                    "missing state",
3702                ),
3703                object_descriptor_with_status(
3704                    &state_visibility_info(state),
3705                    ObjectAvailabilityStatus::Missing,
3706                    "missing state visibility",
3707                ),
3708            ],
3709            false,
3710        )
3711        .expect("plan pull wants");
3712
3713        let wanted = plan
3714            .wanted_types
3715            .get(&PackObjectId::ChangeId(state))
3716            .expect("same ChangeId want entry");
3717        assert_eq!(
3718            wanted.as_slice(),
3719            &[ObjectType::State, ObjectType::StateVisibility]
3720        );
3721        assert!(native_pack_required_for_pull(
3722            plan.want_full_closure,
3723            &plan.transfer_plan
3724        ));
3725    }
3726
3727    #[derive(Clone)]
3728    struct SidecarOnlyPullService {
3729        state: ChangeId,
3730        state_visibility_blob: Vec<u8>,
3731    }
3732
3733    #[tonic::async_trait]
3734    impl RepoSyncService for SidecarOnlyPullService {
3735        async fn list_refs(
3736            &self,
3737            _request: tonic::Request<ListRefsRequest>,
3738        ) -> Result<Response<ListRefsResponse>, Status> {
3739            Ok(Response::new(ListRefsResponse::default()))
3740        }
3741
3742        async fn update_ref(
3743            &self,
3744            _request: tonic::Request<UpdateRefRequest>,
3745        ) -> Result<Response<UpdateRefResponse>, Status> {
3746            Ok(Response::new(UpdateRefResponse::default()))
3747        }
3748
3749        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3750
3751        async fn push(
3752            &self,
3753            _request: tonic::Request<tonic::Streaming<PushMessage>>,
3754        ) -> Result<Response<Self::PushStream>, Status> {
3755            let (_tx, rx) = mpsc::channel(1);
3756            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3757                rx,
3758            )))
3759        }
3760
3761        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3762
3763        async fn pull(
3764            &self,
3765            request: tonic::Request<tonic::Streaming<PullMessage>>,
3766        ) -> Result<Response<Self::PullStream>, Status> {
3767            let state = self.state;
3768            let state_visibility_blob = self.state_visibility_blob.clone();
3769            let (tx, rx) = mpsc::channel(4);
3770
3771            tokio::spawn(async move {
3772                let mut inbound = request.into_inner();
3773                match inbound.message().await {
3774                    Ok(Some(PullMessage {
3775                        body: Some(pull_message::Body::Request(_)),
3776                    })) => {}
3777                    other => {
3778                        let _ = tx
3779                            .send(Err(Status::invalid_argument(format!(
3780                                "expected pull request, got {other:?}"
3781                            ))))
3782                            .await;
3783                        return;
3784                    }
3785                }
3786
3787                let descriptor = object_descriptor_with_status(
3788                    &state_visibility_info(state),
3789                    ObjectAvailabilityStatus::Missing,
3790                    "missing state visibility",
3791                );
3792                let ready = PullMessage {
3793                    body: Some(pull_message::Body::Ready(PullReady {
3794                        remote_state: state.to_string_full(),
3795                        objects_to_fetch: vec![descriptor],
3796                        transfer: None,
3797                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
3798                        missing_objects: Vec::new(),
3799                        full_closure_available: false,
3800                        object_count: 1,
3801                        remote_revision_address: RevisionAddress::heddle(state).to_string(),
3802                    })),
3803                };
3804                if tx.send(Ok(ready)).await.is_err() {
3805                    return;
3806                }
3807
3808                match inbound.message().await {
3809                    Ok(Some(PullMessage {
3810                        body: Some(pull_message::Body::Want(want)),
3811                    })) if !want.want_full_closure
3812                        && want.objects.len() == 1
3813                        && want.objects[0].object_type == "state_visibility" => {}
3814                    other => {
3815                        let _ = tx
3816                            .send(Err(Status::invalid_argument(format!(
3817                                "expected sidecar-only want, got {other:?}"
3818                            ))))
3819                            .await;
3820                        return;
3821                    }
3822                }
3823
3824                let transfer = PullMessage {
3825                    body: Some(pull_message::Body::StateVisibility(
3826                        StateVisibilityTransfer {
3827                            state_id: state.to_string_full(),
3828                            state_visibility_blob: state_visibility_blob.into(),
3829                        },
3830                    )),
3831                };
3832                if tx.send(Ok(transfer)).await.is_err() {
3833                    return;
3834                }
3835
3836                let complete = PullMessage {
3837                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
3838                        success: true,
3839                        new_state: state.to_string_full(),
3840                        error: String::new(),
3841                        transfer: Some(TransferCheckpoint {
3842                            transfer_id: "sidecar-only-test".to_string(),
3843                            transport_mode: TransportMode::NativePack as i32,
3844                            resume_offset: 0,
3845                            chunk_index: 0,
3846                            checkpoint: b"heddle-markers-v1\n".to_vec(),
3847                            is_complete: true,
3848                        }),
3849                        new_revision_address: RevisionAddress::heddle(state).to_string(),
3850                    })),
3851                };
3852                let _ = tx.send(Ok(complete)).await;
3853            });
3854
3855            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3856                rx,
3857            )))
3858        }
3859    }
3860
3861    async fn connect_sidecar_only_service(
3862        service: SidecarOnlyPullService,
3863    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3864        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3865            Ok(listener) => listener,
3866            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3867                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3868                return None;
3869            }
3870            Err(err) => panic!("bind test server: {err}"),
3871        };
3872        let addr = listener.local_addr().expect("local addr");
3873        let incoming = futures::stream::unfold(listener, |listener| async {
3874            match listener.accept().await {
3875                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3876                Err(err) => Some((Err(err), listener)),
3877            }
3878        });
3879
3880        let handle = tokio::spawn(async move {
3881            Server::builder()
3882                .add_service(RepoSyncServiceServer::new(service))
3883                .serve_with_incoming(incoming)
3884                .await
3885                .expect("serve sidecar-only test service");
3886        });
3887
3888        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3889            .await
3890            .expect("connect client");
3891        Some((client, handle))
3892    }
3893
3894    #[tokio::test]
3895    async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
3896        let (_dir, repo) = temp_repo();
3897        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3898        let state = State::new_snapshot(
3899            tree_hash,
3900            vec![],
3901            Attribution::human(Principal {
3902                name: "Grace Hopper".into(),
3903                email: "grace@example.com".into(),
3904            }),
3905        );
3906        let state_id = state.change_id;
3907        repo.store().put_state(&state).expect("put state");
3908        assert!(
3909            repo.get_state_visibility_bytes_for_state(&state_id)
3910                .expect("load local sidecar")
3911                .is_none(),
3912            "test starts with state present and StateVisibility sidecar absent"
3913        );
3914
3915        let state_visibility_blob =
3916            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3917                .encode()
3918                .expect("encode state visibility blob");
3919        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3920            state: state_id,
3921            state_visibility_blob,
3922        })
3923        .await
3924        else {
3925            return;
3926        };
3927
3928        let exchange = tokio::time::timeout(
3929            Duration::from_secs(5),
3930            client.pull_exchange(
3931                &repo,
3932                "owner/repo",
3933                "main",
3934                PullOptions {
3935                    local_thread: None,
3936                    depth: None,
3937                    target_state: Some(state_id),
3938                    materialization: PullMaterialization::Full,
3939                },
3940            ),
3941        )
3942        .await
3943        .expect("sidecar-only pull must not hang waiting for native pack")
3944        .expect("sidecar-only pull succeeds");
3945        server.abort();
3946
3947        assert!(exchange.result.success);
3948        assert_eq!(exchange.object_count, 0);
3949        assert_eq!(exchange.profile.pack_bytes_received, 0);
3950        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3951        assert!(
3952            repo.get_state_visibility_for_state(&state_id)
3953                .expect("load accepted sidecar")
3954                .has_record(),
3955            "pull must accept the out-of-pack StateVisibility sidecar"
3956        );
3957    }
3958
3959    // A sidecar blob larger than tonic's 4 MiB default decode limit but well
3960    // under the 64 MiB receive cap must still decode + install: the raised
3961    // `max_decoding_message_size` is the bound, and it isn't set too tight.
3962    #[tokio::test]
3963    async fn legitimate_large_sidecar_blob_decodes_and_installs() {
3964        let (_dir, repo) = temp_repo();
3965        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3966        let state = State::new_snapshot(
3967            tree_hash,
3968            vec![],
3969            Attribution::human(Principal {
3970                name: "Grace Hopper".into(),
3971                email: "grace@example.com".into(),
3972            }),
3973        );
3974        let state_id = state.change_id;
3975        repo.store().put_state(&state).expect("put state");
3976
3977        // ~8 MiB blob: above tonic's 4 MiB default (which would reject at
3978        // decode without the raised limit), below the 64 MiB sidecar cap.
3979        let mut record = sample_state_visibility(state_id);
3980        record.tier = VisibilityTier::Restricted {
3981            scope_label: "x".repeat(8 * 1024 * 1024),
3982        };
3983        let state_visibility_blob = StateVisibilityBlob::new(vec![record])
3984            .encode()
3985            .expect("encode large state visibility blob");
3986        assert!(
3987            state_visibility_blob.len() > 4 * 1024 * 1024,
3988            "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
3989        );
3990        assert!(
3991            (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
3992            "blob must stay within the legitimate sidecar receive cap"
3993        );
3994
3995        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3996            state: state_id,
3997            state_visibility_blob,
3998        })
3999        .await
4000        else {
4001            return;
4002        };
4003
4004        let exchange = tokio::time::timeout(
4005            Duration::from_secs(30),
4006            client.pull_exchange(
4007                &repo,
4008                "owner/repo",
4009                "main",
4010                PullOptions {
4011                    local_thread: None,
4012                    depth: None,
4013                    target_state: Some(state_id),
4014                    materialization: PullMaterialization::Full,
4015                },
4016            ),
4017        )
4018        .await
4019        .expect("large-sidecar pull must not hang")
4020        .expect("large but legitimate sidecar pull succeeds");
4021        server.abort();
4022
4023        assert!(exchange.result.success);
4024        assert!(
4025            repo.get_state_visibility_for_state(&state_id)
4026                .expect("load accepted sidecar")
4027                .has_record(),
4028            "pull must accept a legitimately-large StateVisibility sidecar"
4029        );
4030    }
4031
4032    // A sidecar blob beyond the pull-stream decode limit must be rejected at
4033    // the gRPC decode boundary — before its `Vec<u8>` is materialized — not by
4034    // the cheaper post-decode `check_received_transfer_blob_size` guard.
4035    #[tokio::test]
4036    async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
4037        let (_dir, repo) = temp_repo();
4038        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
4039        let state = State::new_snapshot(
4040            tree_hash,
4041            vec![],
4042            Attribution::human(Principal {
4043                name: "Grace Hopper".into(),
4044                email: "grace@example.com".into(),
4045            }),
4046        );
4047        let state_id = state.change_id;
4048        repo.store().put_state(&state).expect("put state");
4049
4050        // One byte past the decode limit. Content is irrelevant: decode is
4051        // refused before the blob is ever handed to the accept path.
4052        let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
4053        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
4054            state: state_id,
4055            state_visibility_blob: oversized,
4056        })
4057        .await
4058        else {
4059            return;
4060        };
4061
4062        let result = tokio::time::timeout(
4063            Duration::from_secs(30),
4064            client.pull_exchange(
4065                &repo,
4066                "owner/repo",
4067                "main",
4068                PullOptions {
4069                    local_thread: None,
4070                    depth: None,
4071                    target_state: Some(state_id),
4072                    materialization: PullMaterialization::Full,
4073                },
4074            ),
4075        )
4076        .await
4077        .expect("oversized-sidecar pull must not hang");
4078        server.abort();
4079
4080        let err = match result {
4081            Err(err) => err,
4082            Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
4083        };
4084        let message = err.to_string();
4085        assert!(
4086            !message.contains("exceeds receive size limit"),
4087            "rejection must come from the decode-size limit, before the post-decode check: {message}"
4088        );
4089        assert!(
4090            repo.get_state_visibility_for_state(&state_id)
4091                .expect("load sidecar")
4092                .latest()
4093                .expect("resolve visibility")
4094                .is_none(),
4095            "an oversized sidecar must never be installed"
4096        );
4097    }
4098
4099    #[derive(Clone)]
4100    struct StateAndVisibilityPullService {
4101        state: ChangeId,
4102        pack_bundle: wire::NativePackBundle,
4103        state_visibility_blob: Vec<u8>,
4104    }
4105
4106    #[tonic::async_trait]
4107    impl RepoSyncService for StateAndVisibilityPullService {
4108        async fn list_refs(
4109            &self,
4110            _request: tonic::Request<ListRefsRequest>,
4111        ) -> Result<Response<ListRefsResponse>, Status> {
4112            Ok(Response::new(ListRefsResponse::default()))
4113        }
4114
4115        async fn update_ref(
4116            &self,
4117            _request: tonic::Request<UpdateRefRequest>,
4118        ) -> Result<Response<UpdateRefResponse>, Status> {
4119            Ok(Response::new(UpdateRefResponse::default()))
4120        }
4121
4122        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4123
4124        async fn push(
4125            &self,
4126            _request: tonic::Request<tonic::Streaming<PushMessage>>,
4127        ) -> Result<Response<Self::PushStream>, Status> {
4128            let (_tx, rx) = mpsc::channel(1);
4129            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4130                rx,
4131            )))
4132        }
4133
4134        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4135
4136        async fn pull(
4137            &self,
4138            request: tonic::Request<tonic::Streaming<PullMessage>>,
4139        ) -> Result<Response<Self::PullStream>, Status> {
4140            let state = self.state;
4141            let pack_bundle = self.pack_bundle.clone();
4142            let state_visibility_blob = self.state_visibility_blob.clone();
4143            let (tx, rx) = mpsc::channel(8);
4144
4145            tokio::spawn(async move {
4146                let mut inbound = request.into_inner();
4147                match inbound.message().await {
4148                    Ok(Some(PullMessage {
4149                        body: Some(pull_message::Body::Request(_)),
4150                    })) => {}
4151                    other => {
4152                        let _ = tx
4153                            .send(Err(Status::invalid_argument(format!(
4154                                "expected pull request, got {other:?}"
4155                            ))))
4156                            .await;
4157                        return;
4158                    }
4159                }
4160
4161                let ready = PullMessage {
4162                    body: Some(pull_message::Body::Ready(PullReady {
4163                        remote_state: state.to_string_full(),
4164                        objects_to_fetch: vec![
4165                            object_descriptor_with_status(
4166                                &state_info(state),
4167                                ObjectAvailabilityStatus::Missing,
4168                                "missing state",
4169                            ),
4170                            object_descriptor_with_status(
4171                                &state_visibility_info(state),
4172                                ObjectAvailabilityStatus::Missing,
4173                                "missing state visibility",
4174                            ),
4175                        ],
4176                        transfer: None,
4177                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
4178                        missing_objects: Vec::new(),
4179                        full_closure_available: false,
4180                        object_count: 2,
4181                        remote_revision_address: RevisionAddress::heddle(state).to_string(),
4182                    })),
4183                };
4184                if tx.send(Ok(ready)).await.is_err() {
4185                    return;
4186                }
4187
4188                match inbound.message().await {
4189                    Ok(Some(PullMessage {
4190                        body: Some(pull_message::Body::Want(want)),
4191                    })) if !want.want_full_closure
4192                        && want.objects.len() == 2
4193                        && want
4194                            .objects
4195                            .iter()
4196                            .any(|object| object.object_type == "state")
4197                        && want
4198                            .objects
4199                            .iter()
4200                            .any(|object| object.object_type == "state_visibility") => {}
4201                    other => {
4202                        let _ = tx
4203                            .send(Err(Status::invalid_argument(format!(
4204                                "expected state + sidecar wants, got {other:?}"
4205                            ))))
4206                            .await;
4207                        return;
4208                    }
4209                }
4210
4211                for message in
4212                    encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
4213                {
4214                    if tx.send(Ok(message)).await.is_err() {
4215                        return;
4216                    }
4217                }
4218
4219                let transfer = PullMessage {
4220                    body: Some(pull_message::Body::StateVisibility(
4221                        StateVisibilityTransfer {
4222                            state_id: state.to_string_full(),
4223                            state_visibility_blob: state_visibility_blob.into(),
4224                        },
4225                    )),
4226                };
4227                if tx.send(Ok(transfer)).await.is_err() {
4228                    return;
4229                }
4230
4231                let complete = PullMessage {
4232                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
4233                        success: true,
4234                        new_state: state.to_string_full(),
4235                        error: String::new(),
4236                        transfer: Some(TransferCheckpoint {
4237                            transfer_id: "state-and-visibility-test".to_string(),
4238                            transport_mode: TransportMode::NativePack as i32,
4239                            resume_offset: 0,
4240                            chunk_index: 0,
4241                            checkpoint: b"heddle-markers-v1\n".to_vec(),
4242                            is_complete: true,
4243                        }),
4244                        new_revision_address: RevisionAddress::heddle(state).to_string(),
4245                    })),
4246                };
4247                let _ = tx.send(Ok(complete)).await;
4248            });
4249
4250            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4251                rx,
4252            )))
4253        }
4254    }
4255
4256    fn encode_pull_native_pack_messages(
4257        bundle: &wire::NativePackBundle,
4258        transfer_id: &str,
4259        chunk_size: usize,
4260    ) -> Vec<PullMessage> {
4261        let mut messages = Vec::new();
4262        let chunk_size = chunk_size.max(1);
4263
4264        let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
4265        for chunk_index in 0..pack_total_chunks.max(1) {
4266            let Some((start, len)) =
4267                wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
4268            else {
4269                break;
4270            };
4271            messages.push(PullMessage {
4272                body: Some(pull_message::Body::Pack(PackChunk {
4273                    stream_kind: PackStreamKind::Pack as i32,
4274                    data: bundle.pack_data[start..start + len].to_vec().into(),
4275                    transfer: Some(TransferCheckpoint {
4276                        transfer_id: transfer_id.to_string(),
4277                        transport_mode: TransportMode::NativePack as i32,
4278                        resume_offset: start as u64,
4279                        chunk_index: chunk_index as u32,
4280                        checkpoint: Vec::new(),
4281                        is_complete: chunk_index + 1 == pack_total_chunks,
4282                    }),
4283                    chunk_length: len as u32,
4284                    is_final_chunk: chunk_index + 1 == pack_total_chunks,
4285                })),
4286            });
4287        }
4288
4289        let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
4290        for chunk_index in 0..index_total_chunks.max(1) {
4291            let Some((start, len)) =
4292                wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
4293            else {
4294                break;
4295            };
4296            messages.push(PullMessage {
4297                body: Some(pull_message::Body::Pack(PackChunk {
4298                    stream_kind: PackStreamKind::Index as i32,
4299                    data: bundle.index_data[start..start + len].to_vec().into(),
4300                    transfer: Some(TransferCheckpoint {
4301                        transfer_id: transfer_id.to_string(),
4302                        transport_mode: TransportMode::NativePack as i32,
4303                        resume_offset: start as u64,
4304                        chunk_index: chunk_index as u32,
4305                        checkpoint: Vec::new(),
4306                        is_complete: chunk_index + 1 == index_total_chunks,
4307                    }),
4308                    chunk_length: len as u32,
4309                    is_final_chunk: chunk_index + 1 == index_total_chunks,
4310                })),
4311            });
4312        }
4313
4314        messages
4315    }
4316
4317    async fn connect_state_and_visibility_service(
4318        service: StateAndVisibilityPullService,
4319    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4320        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4321            Ok(listener) => listener,
4322            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4323                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4324                return None;
4325            }
4326            Err(err) => panic!("bind test server: {err}"),
4327        };
4328        let addr = listener.local_addr().expect("local addr");
4329        let incoming = futures::stream::unfold(listener, |listener| async {
4330            match listener.accept().await {
4331                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4332                Err(err) => Some((Err(err), listener)),
4333            }
4334        });
4335
4336        let handle = tokio::spawn(async move {
4337            Server::builder()
4338                .add_service(RepoSyncServiceServer::new(service))
4339                .serve_with_incoming(incoming)
4340                .await
4341                .expect("serve state-and-visibility test service");
4342        });
4343
4344        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4345            .await
4346            .expect("connect client");
4347        Some((client, handle))
4348    }
4349
4350    #[tokio::test]
4351    async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
4352        let (_source_dir, source_repo) = temp_repo();
4353        let (_target_dir, target_repo) = temp_repo();
4354        let tree_hash = source_repo
4355            .store()
4356            .put_tree(&Tree::new())
4357            .expect("put tree");
4358        let state = State::new_snapshot(
4359            tree_hash,
4360            vec![],
4361            Attribution::human(Principal {
4362                name: "Grace Hopper".into(),
4363                email: "grace@example.com".into(),
4364            }),
4365        );
4366        let state_id = state.change_id;
4367        source_repo
4368            .store()
4369            .put_state(&state)
4370            .expect("put source state");
4371        let state_visibility_blob =
4372            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
4373                .encode()
4374                .expect("encode state visibility blob");
4375        source_repo
4376            .accept_wire_state_visibility(state_id, &state_visibility_blob)
4377            .expect("put source state visibility");
4378        let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
4379            .expect("build state pack");
4380
4381        assert!(
4382            target_repo
4383                .store()
4384                .get_state(&state_id)
4385                .expect("load target state")
4386                .is_none(),
4387            "test starts with state absent"
4388        );
4389        assert!(
4390            target_repo
4391                .get_state_visibility_bytes_for_state(&state_id)
4392                .expect("load target sidecar")
4393                .is_none(),
4394            "test starts with StateVisibility sidecar absent"
4395        );
4396
4397        let Some((mut client, server)) =
4398            connect_state_and_visibility_service(StateAndVisibilityPullService {
4399                state: state_id,
4400                pack_bundle,
4401                state_visibility_blob,
4402            })
4403            .await
4404        else {
4405            return;
4406        };
4407
4408        let exchange = tokio::time::timeout(
4409            Duration::from_secs(5),
4410            client.pull_exchange(
4411                &target_repo,
4412                "owner/repo",
4413                "main",
4414                PullOptions {
4415                    local_thread: None,
4416                    depth: None,
4417                    target_state: Some(state_id),
4418                    materialization: PullMaterialization::Full,
4419                },
4420            ),
4421        )
4422        .await
4423        .expect("state + sidecar pull must not hang waiting for native pack")
4424        .expect("state + sidecar pull succeeds");
4425        server.abort();
4426
4427        assert!(exchange.result.success);
4428        assert_eq!(exchange.object_count, 1);
4429        assert!(exchange.profile.pack_bytes_received > 0);
4430        assert_eq!(exchange.profile.object_mix.states, 1);
4431        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
4432        assert!(
4433            target_repo
4434                .store()
4435                .get_state(&state_id)
4436                .expect("load installed state")
4437                .is_some(),
4438            "native pack must install the State"
4439        );
4440        assert!(
4441            target_repo
4442                .get_state_visibility_for_state(&state_id)
4443                .expect("load accepted sidecar")
4444                .has_record(),
4445            "pull must accept the out-of-pack StateVisibility sidecar"
4446        );
4447    }
4448
4449    #[test]
4450    fn missing_blobs_in_tree_treats_absent_tree_as_empty() {
4451        let (_dir, repo) = temp_repo();
4452        let absent_tree = ContentHash::from_bytes([99u8; 32]);
4453
4454        let missing = wire::missing_blobs_in_tree(repo.store(), absent_tree)
4455            .expect("absent tree is not an error");
4456
4457        assert!(missing.is_empty());
4458    }
4459
4460    #[test]
4461    fn missing_blobs_in_tree_reports_only_genuinely_missing_blobs() {
4462        let (_dir, repo) = temp_repo();
4463        let present_blob = Blob::from("already local");
4464        let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
4465        let missing_hash = ContentHash::from_bytes([42u8; 32]);
4466        let tree = Tree::from_entries(vec![
4467            TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
4468            TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
4469        ]);
4470        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4471
4472        let missing =
4473            wire::missing_blobs_in_tree(repo.store(), tree_hash).expect("collect missing blobs");
4474
4475        assert_eq!(missing, vec![missing_hash]);
4476    }
4477
4478    #[test]
4479    fn missing_blobs_in_tree_reports_corrupt_tree_read() {
4480        let (_dir, repo) = temp_repo();
4481        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
4482        std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
4483        repo.store().clear_recent_caches();
4484
4485        let err = wire::missing_blobs_in_tree(repo.store(), tree_hash)
4486            .expect_err("corrupt tree must fail");
4487
4488        assert!(matches!(err, ProtocolError::InvalidState(_)));
4489        assert!(
4490            err.to_string().contains(&format!(
4491                "load tree {} while collecting lazy hydration missing blobs",
4492                tree_hash.to_hex()
4493            )),
4494            "unexpected error: {err}"
4495        );
4496    }
4497
4498    #[test]
4499    fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
4500        let (_dir, repo) = temp_repo();
4501        let blob = sample_blob();
4502        repo.put_redaction(sample_redaction(blob))
4503            .expect("put redaction");
4504        let expected_bytes = repo
4505            .store()
4506            .get_redactions_bytes_for_blob(&blob)
4507            .expect("load sidecar")
4508            .expect("sidecar exists");
4509
4510        let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
4511
4512        let Some(push_message::Body::Redaction(transfer)) = message.body else {
4513            panic!("expected redaction transfer");
4514        };
4515        assert_eq!(transfer.blob_hash, blob.to_hex());
4516        assert_eq!(transfer.redactions_blob, expected_bytes);
4517    }
4518
4519    #[test]
4520    fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
4521        let (_dir, repo) = temp_repo();
4522        let blob = sample_blob();
4523
4524        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
4525
4526        assert!(matches!(err, ProtocolError::InvalidState(_)));
4527        assert!(
4528            err.to_string().contains(&format!(
4529                "server wants redaction for blob {} but sender has no sidecar",
4530                blob.to_hex()
4531            )),
4532            "unexpected error: {err}"
4533        );
4534    }
4535
4536    #[test]
4537    fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
4538        let (_dir, repo) = temp_repo();
4539        let blob = sample_blob();
4540        let redaction_path = repo
4541            .heddle_dir()
4542            .join("redactions")
4543            .join(format!("{}.bin", blob.to_hex()));
4544        std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
4545
4546        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
4547
4548        assert!(matches!(err, ProtocolError::InvalidState(_)));
4549        assert!(
4550            err.to_string()
4551                .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
4552            "unexpected error: {err}"
4553        );
4554    }
4555
4556    #[test]
4557    fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
4558        let (_dir, repo) = temp_repo();
4559        let state = ChangeId::from_bytes([17u8; 16]);
4560        repo.put_state_visibility(sample_state_visibility(state))
4561            .expect("put state visibility");
4562        let expected_bytes = repo
4563            .get_state_visibility_bytes_for_state(&state)
4564            .expect("load sidecar")
4565            .expect("sidecar exists");
4566
4567        let message =
4568            state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
4569
4570        let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
4571            panic!("expected state visibility transfer");
4572        };
4573        assert_eq!(transfer.state_id, state.to_string_full());
4574        assert_eq!(transfer.state_visibility_blob, expected_bytes);
4575    }
4576
4577    // ---- bare-pull head advertisement (delta sync) ----
4578
4579    fn sample_attribution() -> Attribution {
4580        Attribution::human(Principal {
4581            name: "Grace Hopper".into(),
4582            email: "grace@example.com".into(),
4583        })
4584    }
4585
4586    /// Put a state whose tree holds one blob into `repo`'s store. Returns the
4587    /// state id. With `parents`, builds a child on top of an existing state.
4588    fn put_state_with_blob(repo: &Repository, contents: &str, parents: Vec<ChangeId>) -> ChangeId {
4589        let blob = Blob::from(contents);
4590        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4591        let tree = Tree::from_entries(vec![
4592            TreeEntry::file(format!("{contents}.txt"), blob_hash, false).expect("tree entry"),
4593        ]);
4594        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4595        let state = State::new_snapshot(tree_hash, parents, sample_attribution());
4596        let state_id = state.change_id;
4597        repo.store().put_state(&state).expect("put state");
4598        state_id
4599    }
4600
4601    #[test]
4602    fn locally_complete_pull_head_advertises_fully_present_thread_head() {
4603        let (_dir, repo) = temp_repo();
4604        let head = put_state_with_blob(&repo, "alpha", vec![]);
4605        repo.refs()
4606            .set_thread(&ThreadName::from("main"), &head)
4607            .expect("set thread");
4608
4609        let advertised =
4610            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4611        assert_eq!(
4612            advertised,
4613            Some(head),
4614            "a thread head whose full closure is present locally must be advertised"
4615        );
4616    }
4617
4618    #[test]
4619    fn locally_complete_pull_head_skips_unknown_thread() {
4620        // Unseeded local repo: the bare-pull thread has no local head, so
4621        // there is nothing to advertise and the pull falls back to a full
4622        // closure.
4623        let (_dir, repo) = temp_repo_unseeded();
4624        let advertised =
4625            locally_complete_pull_head(&repo, "nonexistent", None).expect("completeness check");
4626        assert_eq!(advertised, None);
4627    }
4628
4629    #[test]
4630    fn locally_complete_pull_head_skips_when_target_state_override_in_play() {
4631        let (_dir, repo) = temp_repo();
4632        let head = put_state_with_blob(&repo, "alpha", vec![]);
4633        repo.refs()
4634            .set_thread(&ThreadName::from("main"), &head)
4635            .expect("set thread");
4636
4637        // A fetch_state-style override drives the want plan directly; the
4638        // thread head is unrelated to what's being fetched.
4639        let advertised =
4640            locally_complete_pull_head(&repo, "main", Some(head)).expect("completeness check");
4641        assert_eq!(advertised, None);
4642    }
4643
4644    #[test]
4645    fn locally_complete_pull_head_skips_repo_with_missing_blobs() {
4646        // A partial/lazy clone records known-missing blobs: it may hold a
4647        // state's metadata while lacking blob content. Advertising such a
4648        // head would make the server omit objects and silently leave us with
4649        // an incomplete repo — so the completeness gate must refuse.
4650        let (_dir, repo) = temp_repo();
4651        let head = put_state_with_blob(&repo, "alpha", vec![]);
4652        repo.refs()
4653            .set_thread(&ThreadName::from("main"), &head)
4654            .expect("set thread");
4655        repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4656            .expect("record missing blob");
4657
4658        let advertised =
4659            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4660        assert_eq!(
4661            advertised, None,
4662            "a repo carrying missing blobs must never advertise a head"
4663        );
4664    }
4665
4666    #[test]
4667    fn locally_complete_pull_head_skips_head_with_incomplete_closure() {
4668        // The thread head's *state* is present, but a parent state in its
4669        // closure is absent. Walking the closure surfaces `ObjectNotFound`,
4670        // so the head must NOT be advertised. This is the dangerous case the
4671        // cardinal correctness constraint guards against.
4672        let (_dir, repo) = temp_repo();
4673        let absent_parent = ChangeId::generate();
4674        let blob = Blob::from("orphan");
4675        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4676        let tree = Tree::from_entries(vec![
4677            TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4678        ]);
4679        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4680        // Child references a parent state that is NOT in the store.
4681        let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4682        let child_id = child.change_id;
4683        repo.store().put_state(&child).expect("put child state");
4684        repo.refs()
4685            .set_thread(&ThreadName::from("main"), &child_id)
4686            .expect("set thread");
4687
4688        let advertised =
4689            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4690        assert_eq!(
4691            advertised, None,
4692            "a head whose closure has an absent parent state must not be advertised"
4693        );
4694    }
4695
4696    #[test]
4697    fn locally_complete_local_thread_head_advertises_fully_present_thread_head() {
4698        // The explicit `--local-thread` happy path: the named thread's head
4699        // closure is fully local, so it may be advertised (fast delta path).
4700        let (_dir, repo) = temp_repo();
4701        let head = put_state_with_blob(&repo, "alpha", vec![]);
4702        repo.refs()
4703            .set_thread(&ThreadName::from("feature"), &head)
4704            .expect("set thread");
4705
4706        let advertised =
4707            locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4708        assert_eq!(
4709            advertised,
4710            Some(head),
4711            "an explicit local-thread head whose full closure is present must be advertised"
4712        );
4713    }
4714
4715    #[test]
4716    fn locally_complete_local_thread_head_skips_unknown_thread() {
4717        // `--local-thread` naming a thread with no local head: nothing to
4718        // advertise, falls back to a full closure.
4719        let (_dir, repo) = temp_repo_unseeded();
4720        let advertised = locally_complete_local_thread_head(&repo, "nonexistent", None)
4721            .expect("completeness check");
4722        assert_eq!(advertised, None);
4723    }
4724
4725    #[test]
4726    fn locally_complete_local_thread_head_skips_when_target_state_override_in_play() {
4727        let (_dir, repo) = temp_repo();
4728        let head = put_state_with_blob(&repo, "alpha", vec![]);
4729        repo.refs()
4730            .set_thread(&ThreadName::from("feature"), &head)
4731            .expect("set thread");
4732
4733        // A target-state override drives the want plan directly; the explicit
4734        // thread head is unrelated and must not be advertised.
4735        let advertised = locally_complete_local_thread_head(&repo, "feature", Some(head))
4736            .expect("completeness check");
4737        assert_eq!(advertised, None);
4738    }
4739
4740    #[test]
4741    fn locally_complete_local_thread_head_skips_repo_with_missing_blobs() {
4742        // A partial/lazy clone named via `--local-thread`: holds metadata but
4743        // records known-missing blobs. The gate must refuse.
4744        let (_dir, repo) = temp_repo();
4745        let head = put_state_with_blob(&repo, "alpha", vec![]);
4746        repo.refs()
4747            .set_thread(&ThreadName::from("feature"), &head)
4748            .expect("set thread");
4749        repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4750            .expect("record missing blob");
4751
4752        let advertised =
4753            locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4754        assert_eq!(
4755            advertised, None,
4756            "a repo carrying missing blobs must never advertise an explicit local-thread head"
4757        );
4758    }
4759
4760    #[test]
4761    fn locally_complete_local_thread_head_skips_head_with_incomplete_closure() {
4762        // The cardinal case for the explicit branch: the named thread's head
4763        // state is present, but a parent state in its closure is absent — an
4764        // interrupted prior pull or partial clone. Advertising this head would
4765        // make the server prune objects we lack. The gate must refuse.
4766        let (_dir, repo) = temp_repo();
4767        let absent_parent = ChangeId::generate();
4768        let blob = Blob::from("orphan");
4769        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4770        let tree = Tree::from_entries(vec![
4771            TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4772        ]);
4773        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4774        let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4775        let child_id = child.change_id;
4776        repo.store().put_state(&child).expect("put child state");
4777        repo.refs()
4778            .set_thread(&ThreadName::from("feature"), &child_id)
4779            .expect("set thread");
4780
4781        let advertised =
4782            locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4783        assert_eq!(
4784            advertised, None,
4785            "an explicit local-thread head whose closure has an absent parent must not be advertised"
4786        );
4787    }
4788
4789    /// Mock pull service that mimics the weft server's `exclude_states`
4790    /// contract: it captures the inbound `exclude_states`, fires the
4791    /// zero-delta short-circuit when the remote tip is advertised, and
4792    /// otherwise sends only the objects NOT covered by an advertised
4793    /// (parent) closure.
4794    #[derive(Clone)]
4795    struct DeltaAwarePullService {
4796        remote_state: ChangeId,
4797        /// Object descriptors keyed by the parent state that an advertised
4798        /// `exclude_states` entry would cover. If `exclude_states` contains
4799        /// `remote_state`, the delta is empty (short-circuit). If it contains
4800        /// a known parent, only the child-specific objects are sent. If it
4801        /// contains neither, the full closure is sent.
4802        full_closure: Vec<ObjectInfo>,
4803        delta_objects: Vec<ObjectInfo>,
4804        known_parent: ChangeId,
4805        full_pack: wire::NativePackBundle,
4806        delta_pack: wire::NativePackBundle,
4807        captured_exclude: std::sync::Arc<std::sync::Mutex<Option<Vec<String>>>>,
4808    }
4809
4810    #[tonic::async_trait]
4811    impl RepoSyncService for DeltaAwarePullService {
4812        async fn list_refs(
4813            &self,
4814            _request: tonic::Request<ListRefsRequest>,
4815        ) -> Result<Response<ListRefsResponse>, Status> {
4816            Ok(Response::new(ListRefsResponse::default()))
4817        }
4818
4819        async fn update_ref(
4820            &self,
4821            _request: tonic::Request<UpdateRefRequest>,
4822        ) -> Result<Response<UpdateRefResponse>, Status> {
4823            Ok(Response::new(UpdateRefResponse::default()))
4824        }
4825
4826        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4827
4828        async fn push(
4829            &self,
4830            _request: tonic::Request<tonic::Streaming<PushMessage>>,
4831        ) -> Result<Response<Self::PushStream>, Status> {
4832            let (_tx, rx) = mpsc::channel(1);
4833            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4834                rx,
4835            )))
4836        }
4837
4838        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4839
4840        async fn pull(
4841            &self,
4842            request: tonic::Request<tonic::Streaming<PullMessage>>,
4843        ) -> Result<Response<Self::PullStream>, Status> {
4844            let svc = self.clone();
4845            let (tx, rx) = mpsc::channel(16);
4846
4847            tokio::spawn(async move {
4848                let mut inbound = request.into_inner();
4849                let exclude = match inbound.message().await {
4850                    Ok(Some(PullMessage {
4851                        body: Some(pull_message::Body::Request(req)),
4852                    })) => req.exclude_states,
4853                    other => {
4854                        let _ = tx
4855                            .send(Err(Status::invalid_argument(format!(
4856                                "expected pull request, got {other:?}"
4857                            ))))
4858                            .await;
4859                        return;
4860                    }
4861                };
4862                *svc.captured_exclude.lock().unwrap() = Some(exclude.clone());
4863
4864                let remote_full = svc.remote_state.to_string_full();
4865                let parent_full = svc.known_parent.to_string_full();
4866
4867                // Mirror the server contract: subtract advertised closures.
4868                let (objects, pack, short_circuit) = if exclude.contains(&remote_full) {
4869                    // weft#215 zero-delta short-circuit: client is at the tip.
4870                    (Vec::new(), None, true)
4871                } else if exclude.contains(&parent_full) {
4872                    // Client is behind at `known_parent`; send only the delta.
4873                    (
4874                        svc.delta_objects.clone(),
4875                        Some(svc.delta_pack.clone()),
4876                        false,
4877                    )
4878                } else {
4879                    // No usable advertisement; send the full closure.
4880                    (svc.full_closure.clone(), Some(svc.full_pack.clone()), false)
4881                };
4882
4883                let descriptors: Vec<_> = objects
4884                    .iter()
4885                    .map(|info| {
4886                        object_descriptor_with_status(
4887                            info,
4888                            ObjectAvailabilityStatus::Missing,
4889                            "requested by client",
4890                        )
4891                    })
4892                    .collect();
4893
4894                let ready = PullMessage {
4895                    body: Some(pull_message::Body::Ready(PullReady {
4896                        remote_state: remote_full.clone(),
4897                        objects_to_fetch: descriptors,
4898                        transfer: None,
4899                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
4900                        missing_objects: Vec::new(),
4901                        full_closure_available: false,
4902                        object_count: objects.len() as u32,
4903                        remote_revision_address: RevisionAddress::heddle(svc.remote_state)
4904                            .to_string(),
4905                    })),
4906                };
4907                if tx.send(Ok(ready)).await.is_err() {
4908                    return;
4909                }
4910
4911                // Expect the client's Want.
4912                match inbound.message().await {
4913                    Ok(Some(PullMessage {
4914                        body: Some(pull_message::Body::Want(_)),
4915                    })) => {}
4916                    other => {
4917                        let _ = tx
4918                            .send(Err(Status::invalid_argument(format!(
4919                                "expected want, got {other:?}"
4920                            ))))
4921                            .await;
4922                        return;
4923                    }
4924                }
4925
4926                if let Some(pack) = pack
4927                    && !short_circuit
4928                {
4929                    for message in encode_pull_native_pack_messages(&pack, "delta-aware-test", 64) {
4930                        if tx.send(Ok(message)).await.is_err() {
4931                            return;
4932                        }
4933                    }
4934                }
4935
4936                let complete = PullMessage {
4937                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
4938                        success: true,
4939                        new_state: remote_full,
4940                        error: String::new(),
4941                        transfer: Some(TransferCheckpoint {
4942                            transfer_id: "delta-aware-test".to_string(),
4943                            transport_mode: TransportMode::NativePack as i32,
4944                            resume_offset: 0,
4945                            chunk_index: 0,
4946                            checkpoint: b"heddle-markers-v1\n".to_vec(),
4947                            is_complete: true,
4948                        }),
4949                        new_revision_address: RevisionAddress::heddle(svc.remote_state)
4950                            .to_string(),
4951                    })),
4952                };
4953                let _ = tx.send(Ok(complete)).await;
4954            });
4955
4956            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4957                rx,
4958            )))
4959        }
4960    }
4961
4962    async fn connect_delta_aware_service(
4963        service: DeltaAwarePullService,
4964    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4965        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4966            Ok(listener) => listener,
4967            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4968                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4969                return None;
4970            }
4971            Err(err) => panic!("bind test server: {err}"),
4972        };
4973        let addr = listener.local_addr().expect("local addr");
4974        let incoming = futures::stream::unfold(listener, |listener| async {
4975            match listener.accept().await {
4976                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4977                Err(err) => Some((Err(err), listener)),
4978            }
4979        });
4980        let handle = tokio::spawn(async move {
4981            Server::builder()
4982                .add_service(RepoSyncServiceServer::new(service))
4983                .serve_with_incoming(incoming)
4984                .await
4985                .expect("serve delta-aware test service");
4986        });
4987        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4988            .await
4989            .expect("connect client");
4990        Some((client, handle))
4991    }
4992
4993    #[tokio::test]
4994    async fn warm_bare_pull_advertises_head_and_fires_zero_delta_short_circuit() {
4995        // Client is exactly at the remote tip. A bare pull must advertise that
4996        // tip as exclude_states; the server then returns an empty delta
4997        // (weft#215 short-circuit) and the client transfers nothing.
4998        let (_dir, repo) = temp_repo();
4999        let head = put_state_with_blob(&repo, "tip", vec![]);
5000        repo.refs()
5001            .set_thread(&ThreadName::from("main"), &head)
5002            .expect("set thread");
5003
5004        let full_closure =
5005            wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
5006        let full_pack =
5007            wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
5008        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5009
5010        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5011            remote_state: head,
5012            full_closure,
5013            delta_objects: Vec::new(),
5014            known_parent: ChangeId::generate(),
5015            full_pack: full_pack.clone(),
5016            delta_pack: full_pack,
5017            captured_exclude: captured.clone(),
5018        })
5019        .await
5020        else {
5021            return;
5022        };
5023
5024        let exchange = tokio::time::timeout(
5025            Duration::from_secs(5),
5026            client.pull_exchange(
5027                &repo,
5028                "owner/repo",
5029                "main",
5030                PullOptions {
5031                    local_thread: None,
5032                    depth: None,
5033                    target_state: None,
5034                    materialization: PullMaterialization::Full,
5035                },
5036            ),
5037        )
5038        .await
5039        .expect("warm bare pull must not hang")
5040        .expect("warm bare pull succeeds");
5041        server.abort();
5042
5043        let advertised = captured.lock().unwrap().clone().expect("request captured");
5044        assert_eq!(
5045            advertised,
5046            vec![head.to_string_full()],
5047            "a bare pull at the tip must advertise that tip in exclude_states"
5048        );
5049        assert!(exchange.result.success);
5050        assert_eq!(
5051            exchange.object_count, 0,
5052            "the zero-delta short-circuit must transfer no objects, not the full closure"
5053        );
5054    }
5055
5056    #[tokio::test]
5057    async fn behind_client_bare_pull_receives_exactly_the_missing_delta() {
5058        // The dangerous direction: client sits at the PARENT state, remote is
5059        // at the CHILD. A bare pull advertises the parent (whose closure the
5060        // client fully holds); the server sends only the child-specific
5061        // objects, and the client must end up with a COMPLETE repo (no object
5062        // it lacked is dropped).
5063        // Build parent + child in the SOURCE repo (the server's view).
5064        let (_src_dir, src_repo) = temp_repo_unseeded();
5065        let parent = put_state_with_blob(&src_repo, "base", vec![]);
5066        let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
5067        let child_closure =
5068            wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
5069
5070        // The CLIENT holds exactly the parent closure (the dangerous "behind"
5071        // setup): copy the parent's objects into a fresh client store and
5072        // track `main` at the parent.
5073        let (_dir, repo) = temp_repo_unseeded();
5074        let parent_closure =
5075            wire::enumerate_state_closure(src_repo.store(), parent).expect("parent closure");
5076        let parent_pack =
5077            wire::build_native_pack(src_repo.store(), &parent_closure).expect("parent pack");
5078        wire::install_received_pack(
5079            repo.store(),
5080            &parent_pack.pack_data,
5081            &parent_pack.index_data,
5082        )
5083        .expect("install parent closure into client");
5084        repo.refs()
5085            .set_thread(&ThreadName::from("main"), &parent)
5086            .expect("set thread to parent");
5087        // Sanity: the client provably holds the parent's full closure...
5088        wire::enumerate_state_closure(repo.store(), parent).expect("client holds parent closure");
5089        // ...but NOT the child yet.
5090        assert!(
5091            repo.store()
5092                .get_state(&child)
5093                .expect("probe child")
5094                .is_none(),
5095            "client must start without the child state"
5096        );
5097        let parent_clone = parent;
5098        let full_pack =
5099            wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
5100        // Delta = child closure minus the parent closure (what the server
5101        // would send when the parent is advertised).
5102        let delta_objects = wire::enumerate_state_closure_with_options(
5103            src_repo.store(),
5104            child,
5105            wire::StateClosureOptions {
5106                depth: None,
5107                exclude_states: vec![parent_clone],
5108            },
5109        )
5110        .expect("delta closure");
5111        assert!(
5112            !delta_objects.is_empty() && delta_objects.len() < child_closure.len(),
5113            "delta must be a strict, non-empty subset of the full closure"
5114        );
5115        let delta_pack =
5116            wire::build_native_pack(src_repo.store(), &delta_objects).expect("delta pack");
5117
5118        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5119        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5120            remote_state: child,
5121            full_closure: child_closure.clone(),
5122            delta_objects: delta_objects.clone(),
5123            known_parent: parent_clone,
5124            full_pack,
5125            delta_pack,
5126            captured_exclude: captured.clone(),
5127        })
5128        .await
5129        else {
5130            return;
5131        };
5132
5133        let exchange = tokio::time::timeout(
5134            Duration::from_secs(5),
5135            client.pull_exchange(
5136                &repo,
5137                "owner/repo",
5138                "main",
5139                PullOptions {
5140                    local_thread: None,
5141                    depth: None,
5142                    target_state: None,
5143                    materialization: PullMaterialization::Full,
5144                },
5145            ),
5146        )
5147        .await
5148        .expect("behind-client pull must not hang")
5149        .expect("behind-client pull succeeds");
5150        server.abort();
5151
5152        let advertised = captured.lock().unwrap().clone().expect("request captured");
5153        assert_eq!(
5154            advertised,
5155            vec![parent.to_string_full()],
5156            "a behind client must advertise the parent head it holds"
5157        );
5158        assert!(exchange.result.success);
5159        // The client must now hold the COMPLETE child closure — every object
5160        // in the remote's full closure must be present locally, proving the
5161        // server-side parent-pruning dropped nothing the client lacked.
5162        let reassembled = wire::enumerate_state_closure(repo.store(), child)
5163            .expect("the client must hold the complete child closure after a delta pull");
5164        assert_eq!(
5165            reassembled.len(),
5166            child_closure.len(),
5167            "delta pull must leave the client with the full child closure, no gaps"
5168        );
5169    }
5170
5171    #[tokio::test]
5172    async fn fresh_bare_pull_advertises_nothing_and_gets_full_closure() {
5173        // Unseeded local repo (the `heddle clone` shape): no local head for
5174        // the thread, so nothing is advertised and the server sends the full
5175        // closure (today's behavior).
5176        let (_dir, repo) = temp_repo_unseeded();
5177        let (_src_dir, src_repo) = temp_repo_unseeded();
5178        let remote = put_state_with_blob(&src_repo, "seed", vec![]);
5179        let full_closure =
5180            wire::enumerate_state_closure(src_repo.store(), remote).expect("closure");
5181        let full_pack =
5182            wire::build_native_pack(src_repo.store(), &full_closure).expect("full pack");
5183
5184        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5185        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5186            remote_state: remote,
5187            full_closure: full_closure.clone(),
5188            delta_objects: Vec::new(),
5189            known_parent: ChangeId::generate(),
5190            full_pack: full_pack.clone(),
5191            delta_pack: full_pack,
5192            captured_exclude: captured.clone(),
5193        })
5194        .await
5195        else {
5196            return;
5197        };
5198
5199        let exchange = tokio::time::timeout(
5200            Duration::from_secs(5),
5201            client.pull_exchange(
5202                &repo,
5203                "owner/repo",
5204                "main",
5205                PullOptions {
5206                    local_thread: None,
5207                    depth: None,
5208                    target_state: None,
5209                    materialization: PullMaterialization::Full,
5210                },
5211            ),
5212        )
5213        .await
5214        .expect("fresh bare pull must not hang")
5215        .expect("fresh bare pull succeeds");
5216        server.abort();
5217
5218        let advertised = captured.lock().unwrap().clone().expect("request captured");
5219        assert!(
5220            advertised.is_empty(),
5221            "a fresh repo must advertise no exclude_states"
5222        );
5223        assert!(exchange.result.success);
5224        assert_eq!(
5225            exchange.object_count,
5226            full_closure.len(),
5227            "a fresh pull must receive the full closure, nothing wrongly excluded"
5228        );
5229        assert!(
5230            wire::enumerate_state_closure(repo.store(), remote).is_ok(),
5231            "fresh pull must install the complete closure"
5232        );
5233    }
5234
5235    #[tokio::test]
5236    async fn explicit_local_thread_incomplete_closure_does_not_advertise_and_repairs() {
5237        // The footgun this PR closes: a user passes `--local-thread` against a
5238        // repo whose named thread head has an INCOMPLETE closure (an absent
5239        // parent state — a partial clone or interrupted prior pull). The
5240        // explicit branch must NOT advertise it (that would make the server
5241        // prune objects the client lacks and silently leave it corrupt). The
5242        // completeness gate refuses, the pull falls back to the full closure,
5243        // and the client ends COMPLETE.
5244        //
5245        // Build parent + child in the SOURCE repo (the server's view).
5246        let (_src_dir, src_repo) = temp_repo_unseeded();
5247        let parent = put_state_with_blob(&src_repo, "base", vec![]);
5248        let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
5249        let child_closure =
5250            wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
5251        let full_pack =
5252            wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
5253
5254        // The CLIENT's `feature` thread points at `child`, but the client only
5255        // holds child's own metadata, NOT the parent closure — an incomplete
5256        // local closure. Copy just the child-specific objects (closure minus
5257        // parent) so the parent state is genuinely absent.
5258        let (_dir, repo) = temp_repo_unseeded();
5259        let child_only = wire::enumerate_state_closure_with_options(
5260            src_repo.store(),
5261            child,
5262            wire::StateClosureOptions {
5263                depth: None,
5264                exclude_states: vec![parent],
5265            },
5266        )
5267        .expect("child-only objects");
5268        let child_only_pack =
5269            wire::build_native_pack(src_repo.store(), &child_only).expect("child-only pack");
5270        wire::install_received_pack(
5271            repo.store(),
5272            &child_only_pack.pack_data,
5273            &child_only_pack.index_data,
5274        )
5275        .expect("install child-only objects");
5276        repo.refs()
5277            .set_thread(&ThreadName::from("feature"), &child)
5278            .expect("set local thread to child");
5279        // Sanity: the named thread's closure is INCOMPLETE locally (parent
5280        // state absent), exactly the dangerous over-advertise setup.
5281        assert!(
5282            matches!(
5283                wire::enumerate_state_closure(repo.store(), child),
5284                Err(ProtocolError::ObjectNotFound(_))
5285            ),
5286            "the explicit thread head's closure must start incomplete"
5287        );
5288
5289        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5290        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5291            remote_state: child,
5292            full_closure: child_closure.clone(),
5293            delta_objects: Vec::new(),
5294            known_parent: parent,
5295            full_pack: full_pack.clone(),
5296            delta_pack: full_pack,
5297            captured_exclude: captured.clone(),
5298        })
5299        .await
5300        else {
5301            return;
5302        };
5303
5304        let exchange = tokio::time::timeout(
5305            Duration::from_secs(5),
5306            client.pull_exchange(
5307                &repo,
5308                "owner/repo",
5309                "main",
5310                PullOptions {
5311                    local_thread: Some("feature"),
5312                    depth: None,
5313                    target_state: None,
5314                    materialization: PullMaterialization::Full,
5315                },
5316            ),
5317        )
5318        .await
5319        .expect("explicit-thread pull must not hang")
5320        .expect("explicit-thread pull succeeds");
5321        server.abort();
5322
5323        let advertised = captured.lock().unwrap().clone().expect("request captured");
5324        assert!(
5325            advertised.is_empty(),
5326            "an explicit --local-thread with an incomplete closure must advertise nothing, \
5327             falling back to a full pull (got {advertised:?})"
5328        );
5329        assert!(exchange.result.success);
5330        // The repair: the client must now hold the COMPLETE child closure.
5331        let reassembled = wire::enumerate_state_closure(repo.store(), child)
5332            .expect("the client must hold the complete child closure after the fallback full pull");
5333        assert_eq!(
5334            reassembled.len(),
5335            child_closure.len(),
5336            "the full-pull fallback must leave the client with the complete closure, no gaps"
5337        );
5338    }
5339
5340    #[tokio::test]
5341    async fn explicit_local_thread_complete_closure_advertises_and_fires_short_circuit() {
5342        // Fast path preserved: an explicit `--local-thread` whose head closure
5343        // IS fully local must still advertise that head, so the server can
5344        // prune to the delta (here the zero-delta short-circuit, since the
5345        // client is at the remote tip).
5346        let (_dir, repo) = temp_repo();
5347        let head = put_state_with_blob(&repo, "tip", vec![]);
5348        repo.refs()
5349            .set_thread(&ThreadName::from("feature"), &head)
5350            .expect("set local thread");
5351        // Sanity: the named thread's closure is fully present locally.
5352        wire::enumerate_state_closure(repo.store(), head)
5353            .expect("client holds the complete thread closure");
5354
5355        let full_closure =
5356            wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
5357        let full_pack =
5358            wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
5359        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5360
5361        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5362            remote_state: head,
5363            full_closure,
5364            delta_objects: Vec::new(),
5365            known_parent: ChangeId::generate(),
5366            full_pack: full_pack.clone(),
5367            delta_pack: full_pack,
5368            captured_exclude: captured.clone(),
5369        })
5370        .await
5371        else {
5372            return;
5373        };
5374
5375        let exchange = tokio::time::timeout(
5376            Duration::from_secs(5),
5377            client.pull_exchange(
5378                &repo,
5379                "owner/repo",
5380                "main",
5381                PullOptions {
5382                    local_thread: Some("feature"),
5383                    depth: None,
5384                    target_state: None,
5385                    materialization: PullMaterialization::Full,
5386                },
5387            ),
5388        )
5389        .await
5390        .expect("explicit-thread pull must not hang")
5391        .expect("explicit-thread pull succeeds");
5392        server.abort();
5393
5394        let advertised = captured.lock().unwrap().clone().expect("request captured");
5395        assert_eq!(
5396            advertised,
5397            vec![head.to_string_full()],
5398            "an explicit --local-thread with a complete closure must advertise its head"
5399        );
5400        assert!(exchange.result.success);
5401        assert_eq!(
5402            exchange.object_count, 0,
5403            "advertising the complete head must fire the zero-delta short-circuit, not a full pull"
5404        );
5405    }
5406}