Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    io::{self, Write},
4    sync::{
5        Arc,
6        atomic::{AtomicUsize, Ordering},
7    },
8    time::{Duration, Instant},
9};
10
11use grpc::heddle::v1::{
12    GetBlobRequest, GitCheckpointTransfer, GitLaneTransfer, GitPackTransfer, GitRefKind,
13    GitRefUpdateTransfer, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
14    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
15    RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
16    ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
17    git_lane_transfer, pull_message, push_message,
18};
19use objects::{
20    object::{ChangeId, ContentHash, MarkerName, ThreadName},
21    store::{AnyStore, ObjectStore, PackObjectId},
22};
23use repo::{
24    GitCheckpointRecord, Repository, RepositoryCapability, RevisionAddress, SyncedThreadMetadata,
25    ThreadManager,
26};
27use sley::{
28    ObjectId as GitObjectId, RefPrecondition, ReferenceTarget, Repository as SleyRepository,
29};
30use tokio::sync::mpsc;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::Request;
33use wire::{
34    ObjectInfo, ObjectType, PlannedObject, ProtocolError, PullComplete, PushComplete, RefEntry,
35    RefUpdated,
36};
37
38use super::{
39    HostedGrpcClient, PullMaterialization,
40    helpers::{
41        descriptor_id, descriptor_id_from_info, object_descriptor_with_status, object_type_name,
42        parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
43        transport_mode_name,
44    },
45};
46
47#[derive(Clone, Copy)]
48struct PullOptions<'a> {
49    local_thread: Option<&'a str>,
50    depth: Option<u32>,
51    target_state: Option<ChangeId>,
52    materialization: PullMaterialization,
53}
54
55struct PullWantPlan {
56    wants: Vec<ObjectDescriptor>,
57    wanted_types: WantedTypes,
58    want_full_closure: bool,
59}
60
61type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
62
63struct GitLanePushPlan {
64    local_revision_address: String,
65    pack: GitPackPushPlan,
66    ref_update: PushMessage,
67}
68
69#[derive(Clone)]
70struct GitPackPushPlan {
71    transfer_id: String,
72    pack_id: Vec<u8>,
73    pack_size: u64,
74    root: GitObjectId,
75    git_repo: SleyRepository,
76}
77
78const PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD: usize = 512;
79const PULL_PACK_SPOOL_OBJECT_THRESHOLD: usize = 512;
80const NATIVE_PACK_DRAIN_OBJECT_INTERVAL: usize = 32;
81const NATIVE_PACK_OBJECT_PREFETCH_LIMIT: usize = 32;
82const NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT: usize = 8;
83
84#[derive(Debug, Clone, Default)]
85pub struct PullObjectMix {
86    pub blobs: usize,
87    pub trees: usize,
88    pub states: usize,
89    pub actions: usize,
90    pub redactions: usize,
91    pub state_visibilities: usize,
92}
93
94#[derive(Debug, Clone)]
95pub struct HostedRefEntry {
96    pub name: String,
97    pub change_id: ChangeId,
98    pub is_thread: bool,
99    pub revision_address: String,
100}
101
102impl PullObjectMix {
103    fn record(&mut self, obj_type: ObjectType) {
104        match obj_type {
105            ObjectType::Blob => self.blobs += 1,
106            ObjectType::Tree => self.trees += 1,
107            ObjectType::State => self.states += 1,
108            ObjectType::Action => self.actions += 1,
109            ObjectType::Redaction => self.redactions += 1,
110            ObjectType::StateVisibility => self.state_visibilities += 1,
111        }
112    }
113
114    pub fn total(&self) -> usize {
115        self.blobs
116            + self.trees
117            + self.states
118            + self.actions
119            + self.redactions
120            + self.state_visibilities
121    }
122}
123
124#[derive(Debug, Clone, Default)]
125pub struct PullProfile {
126    pub ready_wait: Duration,
127    pub receive_and_apply: Duration,
128    pub decode: Duration,
129    pub store_receive_object: Duration,
130    pub metadata_sync: Duration,
131    pub pack_decode_apply: Duration,
132    pub raw_decode_apply: Duration,
133    pub pack_decode: Duration,
134    pub raw_decode: Duration,
135    pub bytes_received: usize,
136    pub pack_bytes_received: usize,
137    pub raw_bytes_received: usize,
138    pub objects_received: usize,
139    pub object_mix: PullObjectMix,
140}
141
142impl HostedGrpcClient {
143    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
144        Ok(self
145            .list_refs_with_revision_addresses(repo_path)
146            .await?
147            .into_iter()
148            .map(|entry| RefEntry {
149                name: entry.name,
150                change_id: entry.change_id,
151                is_thread: entry.is_thread,
152            })
153            .collect())
154    }
155
156    pub async fn list_refs_with_revision_addresses(
157        &mut self,
158        repo_path: &str,
159    ) -> Result<Vec<HostedRefEntry>, ProtocolError> {
160        let mut request = Request::new(ListRefsRequest {
161            repo_path: repo_path.to_string(),
162        });
163        self.apply_auth(&mut request)?;
164        let response = self
165            .inner
166            .list_refs(request)
167            .await
168            .map_err(status_to_protocol_error)?
169            .into_inner();
170        response
171            .refs
172            .into_iter()
173            .map(|entry| {
174                Ok(HostedRefEntry {
175                    name: entry.name,
176                    change_id: ChangeId::try_from_slice(&entry.change_id)
177                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
178                    is_thread: entry.is_thread,
179                    revision_address: entry.revision_address,
180                })
181            })
182            .collect()
183    }
184
185    #[allow(clippy::too_many_arguments)]
186    pub async fn update_ref(
187        &mut self,
188        repo_path: &str,
189        name: &str,
190        is_thread: bool,
191        old_value: Option<ChangeId>,
192        new_value: ChangeId,
193        force: bool,
194        thread_metadata: Option<&SyncedThreadMetadata>,
195    ) -> Result<RefUpdated, ProtocolError> {
196        let mut request = Request::new(UpdateRefRequest {
197            repo_path: repo_path.to_string(),
198            name: name.to_string(),
199            is_thread,
200            force,
201            old_value: old_value
202                .map(|value| value.to_string_full())
203                .unwrap_or_default(),
204            new_value: new_value.to_string_full(),
205            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
206            old_revision_address: old_value.map(native_revision_address).unwrap_or_default(),
207            new_revision_address: native_revision_address(new_value),
208            client_operation_id: String::new(),
209        });
210        self.apply_auth(&mut request)?;
211        let response = self
212            .inner
213            .update_ref(request)
214            .await
215            .map_err(status_to_protocol_error)?
216            .into_inner();
217        Ok(RefUpdated {
218            success: response.success,
219            old_value: if response.old_value.is_empty() {
220                None
221            } else {
222                Some(
223                    ChangeId::parse(&response.old_value)
224                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
225                )
226            },
227            error: (!response.error.is_empty()).then_some(response.error),
228        })
229    }
230
231    pub async fn push(
232        &mut self,
233        repo: &Repository,
234        repo_path: &str,
235        local_state: ChangeId,
236        target_thread: &str,
237        force: bool,
238    ) -> Result<PushComplete, ProtocolError> {
239        self.push_with_revision(
240            repo,
241            repo_path,
242            local_state,
243            target_thread,
244            force,
245            native_revision_address(local_state),
246            None,
247        )
248        .await
249    }
250
251    pub async fn push_git_overlay_checkpoint(
252        &mut self,
253        repo: &Repository,
254        repo_path: &str,
255        local_state: ChangeId,
256        target_thread: &str,
257        force: bool,
258    ) -> Result<PushComplete, ProtocolError> {
259        let git_lane = build_git_lane_push_plan(
260            repo,
261            local_state,
262            target_thread,
263            self.transport.chunk_size.max(1),
264        )?;
265        let local_revision_address = git_lane.local_revision_address.clone();
266        self.push_with_revision(
267            repo,
268            repo_path,
269            local_state,
270            target_thread,
271            force,
272            local_revision_address,
273            Some(git_lane),
274        )
275        .await
276    }
277
278    #[allow(clippy::too_many_arguments)]
279    async fn push_with_revision(
280        &mut self,
281        repo: &Repository,
282        repo_path: &str,
283        local_state: ChangeId,
284        target_thread: &str,
285        force: bool,
286        local_revision_address: String,
287        mut git_lane: Option<GitLanePushPlan>,
288    ) -> Result<PushComplete, ProtocolError> {
289        let _ = self.transport.chunk_size;
290        let _ = self.transport.resume_attempts;
291        let object_plan = wire::enumerate_state_closure_plan(repo.store(), local_state)?;
292        let full_objects = if object_plan.len() <= PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD {
293            Some(wire::enumerate_state_closure(repo.store(), local_state)?)
294        } else {
295            None
296        };
297        let object_count = full_objects
298            .as_ref()
299            .map_or(object_plan.len(), std::vec::Vec::len);
300        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
301        let transport_mode = preferred_transport_mode(&self.transport, object_count);
302        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
303        let request_message = PushMessage {
304            body: Some(push_message::Body::Request(PushRequest {
305                repo_path: repo_path.to_string(),
306                local_state: local_state.to_string_full(),
307                target_thread: target_thread.to_string(),
308                create_thread: true,
309                force,
310                objects: full_objects.as_ref().map_or_else(
311                    || object_plan.iter().map(to_proto_planned_object).collect(),
312                    |objects| objects.iter().map(to_proto_object_info).collect(),
313                ),
314                transfer: Some(self.transport.transfer_checkpoint_with_mode(
315                    transfer_id.clone(),
316                    transport_mode,
317                    0,
318                    0,
319                    false,
320                )),
321                partial_fetch_status: partial_fetch_status_for_repo(repo),
322                allow_partial_fetch: true,
323                thread_metadata: thread_metadata
324                    .map(|metadata| to_proto_thread_metadata(&metadata)),
325                local_revision_address,
326                client_operation_id: String::new(),
327            })),
328        };
329
330        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
331        tx.send(request_message).await.map_err(|_| {
332            ProtocolError::InvalidState("failed to initialize push stream".to_string())
333        })?;
334        let mut request = Request::new(ReceiverStream::new(rx));
335        self.apply_auth(&mut request)?;
336        let mut response = self
337            .inner
338            .push(request)
339            .await
340            .map_err(status_to_protocol_error)?
341            .into_inner();
342
343        let ready = match response.message().await.map_err(status_to_protocol_error)? {
344            Some(PushMessage {
345                body: Some(push_message::Body::Ready(ready)),
346            }) => ready,
347            _ => {
348                return Err(ProtocolError::InvalidState(
349                    "expected PushReady from gRPC server".to_string(),
350                ));
351            }
352        };
353        if let Some(git_lane) = git_lane.as_mut() {
354            apply_git_ref_expectation(&mut git_lane.ref_update, &ready.remote_revision_address)?;
355        }
356
357        let object_index = match full_objects {
358            Some(objects) => objects
359                .into_iter()
360                .map(|info| (descriptor_id_from_info(&info), info))
361                .collect::<HashMap<_, _>>(),
362            None => object_plan
363                .into_iter()
364                .map(|object| {
365                    (
366                        descriptor_id_from_plan(&object),
367                        object_info_from_plan(&object),
368                    )
369                })
370                .collect::<HashMap<_, _>>(),
371        };
372
373        let ready_transport_mode = ready
374            .transfer
375            .as_ref()
376            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
377            .unwrap_or(transport_mode);
378        let wanted_infos = ready
379            .want_objects
380            .into_iter()
381            .map(|want| {
382                object_index
383                    .get(&descriptor_id(&want))
384                    .cloned()
385                    .ok_or_else(|| {
386                        ProtocolError::InvalidState("server requested unknown object".to_string())
387                    })
388            })
389            .collect::<Result<Vec<_>, _>>()?;
390
391        // Split want_objects: blob/tree/state/action → native pack;
392        // sidecars → out-of-pack transfer channels. Sidecars live outside
393        // `.heddle/objects/` so GC can't reach them and they can't ride the
394        // pack — `build_native_pack` skips the same object-type set.
395        let (wanted_sidecars, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
396            .into_iter()
397            .partition(|info| is_out_of_pack_transfer_object_type(info.obj_type));
398
399        if !wanted_packable.is_empty() {
400            send_native_pack_streaming_messages(
401                &tx,
402                repo,
403                &wanted_packable,
404                &transfer_id,
405                self.transport.chunk_size.max(1),
406                &self.transport,
407                ready_transport_mode,
408            )
409            .await?;
410        }
411
412        for info in wanted_sidecars {
413            let message = sidecar_push_message(repo, info)?;
414            tx.send(message).await.map_err(|_| {
415                ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
416            })?;
417        }
418
419        if let Some(git_lane) = git_lane {
420            send_git_pack_streaming_messages(&tx, &git_lane.pack, self.transport.chunk_size.max(1))
421                .await?;
422            tx.send(git_lane.ref_update).await.map_err(|_| {
423                ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
424            })?;
425        }
426        drop(tx);
427
428        let result = match response.message().await.map_err(status_to_protocol_error)? {
429            Some(PushMessage {
430                body: Some(push_message::Body::Complete(complete)),
431            }) => PushComplete {
432                success: complete.success,
433                new_state: if complete.new_state.is_empty() {
434                    None
435                } else {
436                    Some(
437                        ChangeId::parse(&complete.new_state)
438                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
439                    )
440                },
441                error: (!complete.error.is_empty()).then_some(complete.error),
442                transfer_id: complete
443                    .transfer
444                    .as_ref()
445                    .map(|transfer| transfer.transfer_id.clone())
446                    .unwrap_or_default(),
447                transport_mode: complete
448                    .transfer
449                    .as_ref()
450                    .map(|transfer| transport_mode_name(transfer.transport_mode))
451                    .unwrap_or("raw-objects")
452                    .to_string(),
453                resume_offset: complete
454                    .transfer
455                    .as_ref()
456                    .map(|transfer| transfer.resume_offset)
457                    .unwrap_or_default(),
458                chunk_index: complete
459                    .transfer
460                    .as_ref()
461                    .map(|transfer| transfer.chunk_index)
462                    .unwrap_or_default(),
463                checkpoint: complete
464                    .transfer
465                    .as_ref()
466                    .map(|transfer| transfer.checkpoint.clone())
467                    .unwrap_or_default(),
468                is_complete: complete
469                    .transfer
470                    .as_ref()
471                    .map(|transfer| transfer.is_complete)
472                    .unwrap_or(false),
473            },
474            _ => {
475                return Err(ProtocolError::InvalidState(
476                    "expected PushComplete from gRPC server".to_string(),
477                ));
478            }
479        };
480
481        if result.success {
482            self.sync_remote_markers(repo, repo_path, local_state)
483                .await?;
484        }
485        Ok(result)
486    }
487
488    pub async fn pull(
489        &mut self,
490        repo: &Repository,
491        repo_path: &str,
492        remote_thread: &str,
493        local_thread: Option<&str>,
494    ) -> Result<PullComplete, ProtocolError> {
495        self.pull_with_options(
496            repo,
497            repo_path,
498            remote_thread,
499            PullOptions {
500                local_thread,
501                depth: None,
502                target_state: None,
503                materialization: PullMaterialization::Full,
504            },
505        )
506        .await
507    }
508
509    pub async fn pull_profiled(
510        &mut self,
511        repo: &Repository,
512        repo_path: &str,
513        remote_thread: &str,
514        local_thread: Option<&str>,
515    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
516        self.pull_exchange(
517            repo,
518            repo_path,
519            remote_thread,
520            PullOptions {
521                local_thread,
522                depth: None,
523                target_state: None,
524                materialization: PullMaterialization::Full,
525            },
526        )
527        .await
528        .map(|exchange| (exchange.result, exchange.profile))
529    }
530
531    pub async fn pull_partial(
532        &mut self,
533        repo: &Repository,
534        repo_path: &str,
535        remote_thread: &str,
536        local_thread: Option<&str>,
537    ) -> Result<PullComplete, ProtocolError> {
538        self.pull_with_options(
539            repo,
540            repo_path,
541            remote_thread,
542            PullOptions {
543                local_thread,
544                depth: None,
545                target_state: None,
546                materialization: PullMaterialization::Lazy,
547            },
548        )
549        .await
550    }
551
552    pub async fn pull_with_depth_and_materialization(
553        &mut self,
554        repo: &Repository,
555        repo_path: &str,
556        remote_thread: &str,
557        local_thread: Option<&str>,
558        depth: Option<u32>,
559        materialization: PullMaterialization,
560    ) -> Result<PullComplete, ProtocolError> {
561        self.pull_with_options(
562            repo,
563            repo_path,
564            remote_thread,
565            PullOptions {
566                local_thread,
567                depth,
568                target_state: None,
569                materialization,
570            },
571        )
572        .await
573    }
574
575    pub async fn pull_with_depth(
576        &mut self,
577        repo: &Repository,
578        repo_path: &str,
579        remote_thread: &str,
580        local_thread: Option<&str>,
581        depth: Option<u32>,
582    ) -> Result<PullComplete, ProtocolError> {
583        self.pull_with_depth_and_materialization(
584            repo,
585            repo_path,
586            remote_thread,
587            local_thread,
588            depth,
589            PullMaterialization::Full,
590        )
591        .await
592    }
593
594    pub async fn fetch_state(
595        &mut self,
596        repo: &Repository,
597        repo_path: &str,
598        remote_thread: &str,
599        target_state: ChangeId,
600    ) -> Result<usize, ProtocolError> {
601        self.pull_exchange(
602            repo,
603            repo_path,
604            remote_thread,
605            PullOptions {
606                local_thread: None,
607                depth: None,
608                target_state: Some(target_state),
609                materialization: PullMaterialization::Full,
610            },
611        )
612        .await
613        .map(|exchange| exchange.object_count)
614    }
615
616    pub async fn fetch_state_partial(
617        &mut self,
618        repo: &Repository,
619        repo_path: &str,
620        remote_thread: &str,
621        target_state: ChangeId,
622    ) -> Result<usize, ProtocolError> {
623        self.pull_exchange(
624            repo,
625            repo_path,
626            remote_thread,
627            PullOptions {
628                local_thread: None,
629                depth: None,
630                target_state: Some(target_state),
631                materialization: PullMaterialization::Lazy,
632            },
633        )
634        .await
635        .map(|exchange| exchange.object_count)
636    }
637
638    pub async fn hydrate_blob_at_path(
639        &mut self,
640        repo: &Repository,
641        repo_path: &str,
642        reference: &str,
643        path: &str,
644    ) -> Result<objects::object::Blob, ProtocolError> {
645        let mut request = Request::new(GetBlobRequest {
646            repo_path: repo_path.to_string(),
647            r#ref: reference.to_string(),
648            path: path.to_string(),
649        });
650        self.apply_auth(&mut request)?;
651        let response = self
652            .content
653            .get_blob(request)
654            .await
655            .map_err(status_to_protocol_error)?
656            .into_inner();
657
658        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
659        let blob = objects::object::Blob::new(content);
660        repo.store().put_blob(&blob)?;
661        repo.clear_missing_blob(&blob.hash())?;
662        Ok(blob)
663    }
664
665    pub async fn hydrate_missing_blobs_for_state(
666        &mut self,
667        repo: &Repository,
668        repo_path: &str,
669        remote_thread: &str,
670        target_state: ChangeId,
671    ) -> Result<usize, ProtocolError> {
672        let exchange = self
673            .pull_exchange(
674                repo,
675                repo_path,
676                remote_thread,
677                PullOptions {
678                    local_thread: None,
679                    depth: None,
680                    target_state: Some(target_state),
681                    materialization: PullMaterialization::Full,
682                },
683            )
684            .await?;
685        clear_missing_blobs_for_state(repo, target_state)?;
686        Ok(exchange.object_count)
687    }
688
689    async fn pull_with_options(
690        &mut self,
691        repo: &Repository,
692        repo_path: &str,
693        remote_thread: &str,
694        options: PullOptions<'_>,
695    ) -> Result<PullComplete, ProtocolError> {
696        self.pull_exchange(repo, repo_path, remote_thread, options)
697            .await
698            .map(|exchange| exchange.result)
699    }
700
701    async fn pull_exchange(
702        &mut self,
703        repo: &Repository,
704        repo_path: &str,
705        remote_thread: &str,
706        options: PullOptions<'_>,
707    ) -> Result<PullExchange, ProtocolError> {
708        let exchange_start = Instant::now();
709        let mut exclude_states = Vec::new();
710        // Whether the head comes from an explicit `--local-thread` or is
711        // inferred from the bare remote thread, it is advertised as
712        // `exclude_states` ONLY when its full object closure is provably
713        // present locally. The server trusts `exclude_states` blindly and
714        // prunes the advertised closure — so advertising a head whose closure
715        // we lack (a partial/lazy clone, an interrupted prior pull) would make
716        // the server omit those objects and silently leave us with an
717        // incomplete repo. Both branches therefore share the same completeness
718        // gate; when it refuses, we fall back to the correct (slower)
719        // empty-exclude full pull.
720        let advertised_head = if let Some(local_thread) = options.local_thread {
721            locally_complete_local_thread_head(repo, local_thread, options.target_state)?
722        } else {
723            locally_complete_pull_head(repo, remote_thread, options.target_state)?
724        };
725        if let Some(head) = advertised_head {
726            exclude_states.push(head);
727        }
728        let allow_partial_fetch = options.materialization.allows_partial_fetch();
729        let fresh_full_pull =
730            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
731
732        let transfer_id = pull_transfer_id(
733            repo_path,
734            remote_thread,
735            options.local_thread,
736            options.depth,
737            options.target_state,
738        );
739        let request_message = PullMessage {
740            body: Some(pull_message::Body::Request(PullRequest {
741                repo_path: repo_path.to_string(),
742                remote_thread: remote_thread.to_string(),
743                local_thread: options.local_thread.unwrap_or_default().to_string(),
744                target_state: options
745                    .target_state
746                    .map(|value| value.to_string_full())
747                    .unwrap_or_default(),
748                depth: options.depth.unwrap_or_default(),
749                exclude_states: exclude_states
750                    .iter()
751                    .map(ChangeId::to_string_full)
752                    .collect(),
753                transfer: Some(self.transport.transfer_checkpoint_with_mode(
754                    transfer_id.clone(),
755                    TransportMode::NativePack,
756                    0,
757                    0,
758                    false,
759                )),
760                partial_fetch_status: partial_fetch_status_for_repo(repo),
761                allow_partial_fetch,
762                fresh_full_pull,
763                target_revision_address: options
764                    .target_state
765                    .map(native_revision_address)
766                    .unwrap_or_default(),
767                client_operation_id: String::new(),
768            })),
769        };
770
771        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
772        tx.send(request_message).await.map_err(|_| {
773            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
774        })?;
775        let mut request = Request::new(ReceiverStream::new(rx));
776        self.apply_auth(&mut request)?;
777        let mut response = self
778            .inner
779            .pull(request)
780            .await
781            .map_err(status_to_protocol_error)?
782            .into_inner();
783
784        let ready = match response.message().await.map_err(status_to_protocol_error)? {
785            Some(PullMessage {
786                body: Some(pull_message::Body::Ready(ready)),
787            }) => ready,
788            _ => {
789                return Err(ProtocolError::InvalidState(
790                    "expected PullReady from gRPC server".to_string(),
791                ));
792            }
793        };
794        let mut profile = PullProfile {
795            ready_wait: exchange_start.elapsed(),
796            ..PullProfile::default()
797        };
798        let remote_state = ChangeId::parse(&ready.remote_state)
799            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
800        let advertised_object_count = ready.objects_to_fetch.len();
801        let PullWantPlan {
802            wants,
803            wanted_types,
804            want_full_closure,
805        } = plan_pull_wants(
806            repo,
807            &remote_state,
808            ready.full_closure_available,
809            ready.objects_to_fetch,
810            allow_partial_fetch,
811        )?;
812        let native_pack_required = native_pack_required_for_pull(want_full_closure, &wanted_types);
813
814        tx.send(PullMessage {
815            body: Some(pull_message::Body::Want(WantObjects {
816                objects: wants.clone(),
817                want_full_closure,
818                transfer: Some(self.transport.transfer_checkpoint_with_mode(
819                    transfer_id.clone(),
820                    TransportMode::NativePack,
821                    0,
822                    0,
823                    false,
824                )),
825            })),
826        })
827        .await
828        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
829        drop(tx);
830
831        let receive_start = Instant::now();
832        let use_pack_spool = advertised_object_count > PULL_PACK_SPOOL_OBJECT_THRESHOLD;
833        let mut pack_state = wire::PackChunkState::default();
834        let mut pack_spool = if use_pack_spool {
835            Some(wire::PackChunkSpool::new_in(repo.heddle_dir())?)
836        } else {
837            None
838        };
839        let mut git_lane_repo = None;
840        let mut git_pack_state = GitPackPullInstallState::default();
841        let mut received = 0usize;
842        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
843            match message.body {
844                Some(pull_message::Body::Pack(chunk)) => {
845                    profile.bytes_received =
846                        profile.bytes_received.saturating_add(chunk.data.len());
847                    profile.pack_bytes_received =
848                        profile.pack_bytes_received.saturating_add(chunk.data.len());
849                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
850                        ProtocolError::InvalidState(
851                            "native pack chunk missing transfer checkpoint".to_string(),
852                        )
853                    })?;
854                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
855                        .unwrap_or(PackStreamKind::Unspecified);
856                    if stream_kind == PackStreamKind::Unspecified {
857                        return Err(ProtocolError::InvalidState(
858                            "native pack chunk missing stream kind".to_string(),
859                        ));
860                    }
861                    let decode_start = Instant::now();
862                    if let Some(pack_spool) = pack_spool.as_mut() {
863                        pack_spool.receive_chunk(
864                            stream_kind == PackStreamKind::Index,
865                            transfer.resume_offset,
866                            transfer.chunk_index,
867                            transfer.is_complete,
868                            &chunk.data,
869                            chunk.is_final_chunk,
870                        )?;
871                    } else {
872                        wire::receive_pack_chunk(
873                            &mut pack_state,
874                            stream_kind == PackStreamKind::Index,
875                            transfer.resume_offset,
876                            transfer.chunk_index,
877                            transfer.is_complete,
878                            &chunk.data,
879                            chunk.is_final_chunk,
880                        )?;
881                    }
882                    let decode_elapsed = decode_start.elapsed();
883                    profile.pack_decode += decode_elapsed;
884                    profile.pack_decode_apply += decode_elapsed;
885                    profile.decode += decode_elapsed;
886                }
887                Some(pull_message::Body::Redaction(transfer)) => {
888                    // Out-of-pack channel: receive a redaction sidecar
889                    // and route through `Repository::accept_wire_redactions`
890                    // for signature + trust-list verification. The
891                    // server emitted these only for blobs in our want
892                    // set that carry an active redaction.
893                    wire::check_received_transfer_blob_size(
894                        transfer.redactions_blob.len(),
895                        wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
896                        "redactions",
897                    )?;
898                    profile.bytes_received = profile
899                        .bytes_received
900                        .saturating_add(transfer.redactions_blob.len());
901                    profile.object_mix.record(ObjectType::Redaction);
902                    let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
903                        ProtocolError::InvalidState(format!(
904                            "RedactionTransfer.blob_hash is not a valid content hash: {err}"
905                        ))
906                    })?;
907                    let decode_start = Instant::now();
908                    repo.accept_wire_redactions(blob, &transfer.redactions_blob)
909                        .map_err(|err| {
910                            ProtocolError::InvalidState(format!(
911                                "accept_wire_redactions for blob {}: {err}",
912                                transfer.blob_hash
913                            ))
914                        })?;
915                    let decode_elapsed = decode_start.elapsed();
916                    profile.store_receive_object += decode_elapsed;
917                }
918                Some(pull_message::Body::StateVisibility(transfer)) => {
919                    wire::check_received_transfer_blob_size(
920                        transfer.state_visibility_blob.len(),
921                        wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
922                        "state-visibility",
923                    )?;
924                    profile.bytes_received = profile
925                        .bytes_received
926                        .saturating_add(transfer.state_visibility_blob.len());
927                    profile.object_mix.record(ObjectType::StateVisibility);
928                    let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
929                        ProtocolError::InvalidState(format!(
930                            "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
931                        ))
932                    })?;
933                    let decode_start = Instant::now();
934                    repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
935                        .map_err(|err| {
936                            ProtocolError::InvalidState(format!(
937                                "accept_wire_state_visibility for state {}: {err}",
938                                transfer.state_id
939                            ))
940                        })?;
941                    let decode_elapsed = decode_start.elapsed();
942                    profile.store_receive_object += decode_elapsed;
943                }
944                Some(pull_message::Body::GitLane(transfer)) => {
945                    let decode_start = Instant::now();
946                    profile.bytes_received = profile
947                        .bytes_received
948                        .saturating_add(git_lane_transfer_size(&transfer));
949                    accept_git_lane_pull_transfer(
950                        repo,
951                        &mut git_lane_repo,
952                        &mut git_pack_state,
953                        transfer,
954                    )?;
955                    let decode_elapsed = decode_start.elapsed();
956                    profile.store_receive_object += decode_elapsed;
957                }
958                Some(pull_message::Body::Complete(complete)) => {
959                    profile.receive_and_apply = receive_start.elapsed();
960                    git_pack_state.ensure_idle()?;
961                    let final_state = if complete.new_state.is_empty() {
962                        None
963                    } else {
964                        Some(
965                            ChangeId::parse(&complete.new_state)
966                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
967                        )
968                    };
969
970                    if complete.success {
971                        if native_pack_required {
972                            let store_start = Instant::now();
973                            let installed_ids = if let Some(pack_spool) = pack_spool.as_mut() {
974                                if !pack_spool.is_complete() {
975                                    return Err(ProtocolError::InvalidState(
976                                        "pull completed before native pack stream finished"
977                                            .to_string(),
978                                    ));
979                                }
980                                pack_spool.install_into(repo.store())?
981                            } else {
982                                if !pack_state.is_complete() {
983                                    return Err(ProtocolError::InvalidState(
984                                        "pull completed before native pack stream finished"
985                                            .to_string(),
986                                    ));
987                                }
988                                wire::install_received_pack(
989                                    repo.store(),
990                                    &pack_state.pack_data,
991                                    &pack_state.index_data,
992                                )?
993                            };
994                            profile.store_receive_object += store_start.elapsed();
995                            received = installed_ids.len();
996                            for id in installed_ids {
997                                match (id, wanted_packable_type(&wanted_types, &id)) {
998                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
999                                        profile.object_mix.record(ObjectType::Blob);
1000                                        repo.clear_missing_blob(&hash)?;
1001                                    }
1002                                    (_, Some(obj_type)) => {
1003                                        profile.object_mix.record(obj_type);
1004                                    }
1005                                    (PackObjectId::ChangeId(_), None) => {
1006                                        profile.object_mix.record(ObjectType::State);
1007                                    }
1008                                    (PackObjectId::Hash(hash), None) => {
1009                                        let inferred =
1010                                            infer_installed_hash_object_type(repo, &hash)?;
1011                                        profile.object_mix.record(inferred);
1012                                    }
1013                                }
1014                            }
1015                        }
1016
1017                        let metadata_start = Instant::now();
1018                        if let Some(local_thread) = options.local_thread
1019                            && let Some(state) = final_state
1020                        {
1021                            repo.refs()
1022                                .set_thread(&ThreadName::from(local_thread), &state)?;
1023                        }
1024                        if let Some(state) = final_state
1025                            && allow_partial_fetch
1026                        {
1027                            mark_missing_blobs_for_state(repo, state)?;
1028                        } else if final_state.is_some() {
1029                            let _ = repo.clear_all_missing_blobs()?;
1030                        }
1031                        let synced_markers = complete
1032                            .transfer
1033                            .as_ref()
1034                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
1035                            .transpose()?
1036                            .unwrap_or(false);
1037                        if !synced_markers {
1038                            self.sync_local_markers(repo, repo_path).await?;
1039                        }
1040                        profile.metadata_sync = metadata_start.elapsed();
1041                        profile.objects_received = received;
1042                        return Ok(PullExchange {
1043                            result: PullComplete {
1044                                success: true,
1045                                final_state,
1046                                error: None,
1047                                transfer_id: complete
1048                                    .transfer
1049                                    .as_ref()
1050                                    .map(|transfer| transfer.transfer_id.clone())
1051                                    .unwrap_or_default(),
1052                                transport_mode: complete
1053                                    .transfer
1054                                    .as_ref()
1055                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
1056                                    .unwrap_or("native-pack")
1057                                    .to_string(),
1058                                resume_offset: complete
1059                                    .transfer
1060                                    .as_ref()
1061                                    .map(|transfer| transfer.resume_offset)
1062                                    .unwrap_or_default(),
1063                                chunk_index: complete
1064                                    .transfer
1065                                    .as_ref()
1066                                    .map(|transfer| transfer.chunk_index)
1067                                    .unwrap_or_default(),
1068                                checkpoint: complete
1069                                    .transfer
1070                                    .as_ref()
1071                                    .map(|transfer| transfer.checkpoint.clone())
1072                                    .unwrap_or_default(),
1073                                is_complete: complete
1074                                    .transfer
1075                                    .as_ref()
1076                                    .map(|transfer| transfer.is_complete)
1077                                    .unwrap_or(false),
1078                            },
1079                            object_count: received,
1080                            profile,
1081                        });
1082                    }
1083
1084                    profile.objects_received = received;
1085                    return Ok(PullExchange {
1086                        result: PullComplete {
1087                            success: false,
1088                            final_state,
1089                            error: (!complete.error.is_empty()).then_some(complete.error),
1090                            transfer_id: complete
1091                                .transfer
1092                                .as_ref()
1093                                .map(|transfer| transfer.transfer_id.clone())
1094                                .unwrap_or_default(),
1095                            transport_mode: complete
1096                                .transfer
1097                                .as_ref()
1098                                .map(|transfer| transport_mode_name(transfer.transport_mode))
1099                                .unwrap_or("native-pack")
1100                                .to_string(),
1101                            resume_offset: complete
1102                                .transfer
1103                                .as_ref()
1104                                .map(|transfer| transfer.resume_offset)
1105                                .unwrap_or_default(),
1106                            chunk_index: complete
1107                                .transfer
1108                                .as_ref()
1109                                .map(|transfer| transfer.chunk_index)
1110                                .unwrap_or_default(),
1111                            checkpoint: complete
1112                                .transfer
1113                                .as_ref()
1114                                .map(|transfer| transfer.checkpoint.clone())
1115                                .unwrap_or_default(),
1116                            is_complete: complete
1117                                .transfer
1118                                .as_ref()
1119                                .map(|transfer| transfer.is_complete)
1120                                .unwrap_or(false),
1121                        },
1122                        object_count: received,
1123                        profile,
1124                    });
1125                }
1126                _ => {}
1127            }
1128        }
1129
1130        Err(ProtocolError::InvalidState(format!(
1131            "pull stream ended unexpectedly after receiving {received} packed objects"
1132        )))
1133    }
1134}
1135
1136fn redaction_push_message(
1137    repo: &Repository,
1138    info: wire::ObjectInfo,
1139) -> Result<PushMessage, ProtocolError> {
1140    let wire::ObjectId::Hash(blob) = info.id else {
1141        return Err(ProtocolError::InvalidState(
1142            "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
1143        ));
1144    };
1145    let hex = blob.to_hex();
1146    // Sender-side: load the byte-identical sidecar payload
1147    // that `Repository::put_redaction` wrote to disk. The
1148    // receiver verifies the signature + trust list and then
1149    // persists these bytes verbatim.
1150    let bytes = repo
1151        .store()
1152        .get_redactions_bytes_for_blob(&blob)
1153        .map_err(|err| {
1154            ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
1155        })?
1156        .ok_or_else(|| {
1157            ProtocolError::InvalidState(format!(
1158                "server wants redaction for blob {} but sender has no sidecar",
1159                hex
1160            ))
1161        })?;
1162    Ok(PushMessage {
1163        body: Some(push_message::Body::Redaction(RedactionTransfer {
1164            blob_hash: hex,
1165            redactions_blob: bytes.into(),
1166        })),
1167    })
1168}
1169
1170fn is_out_of_pack_transfer_object_type(obj_type: ObjectType) -> bool {
1171    matches!(
1172        obj_type,
1173        ObjectType::Redaction | ObjectType::StateVisibility
1174    )
1175}
1176
1177fn native_pack_required_for_pull(want_full_closure: bool, wanted_types: &WantedTypes) -> bool {
1178    want_full_closure
1179        || wanted_types
1180            .values()
1181            .flatten()
1182            .copied()
1183            .any(wire::is_native_packable_object_type)
1184}
1185
1186fn object_info_from_plan(object: &PlannedObject) -> ObjectInfo {
1187    ObjectInfo {
1188        id: object.id.clone(),
1189        obj_type: object.obj_type,
1190        size: 0,
1191        delta_base: None,
1192    }
1193}
1194
1195fn to_proto_planned_object(object: &PlannedObject) -> ObjectDescriptor {
1196    object_descriptor_with_status(
1197        &object_info_from_plan(object),
1198        ObjectAvailabilityStatus::Present,
1199        "",
1200    )
1201}
1202
1203fn descriptor_id_from_plan(object: &PlannedObject) -> (String, String) {
1204    let id = match &object.id {
1205        wire::ObjectId::Hash(hash) => hash.to_hex(),
1206        wire::ObjectId::ChangeId(change_id) => change_id.to_string_full(),
1207    };
1208    (id, object_type_name(object.obj_type).to_string())
1209}
1210
1211fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
1212    let types = wanted_types.entry(pack_id).or_default();
1213    if !types.contains(&obj_type) {
1214        types.push(obj_type);
1215    }
1216}
1217
1218fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
1219    wanted_types.get(pack_id).and_then(|types| {
1220        types
1221            .iter()
1222            .copied()
1223            .find(|obj_type| wire::is_native_packable_object_type(*obj_type))
1224    })
1225}
1226
1227fn sidecar_push_message(
1228    repo: &Repository,
1229    info: wire::ObjectInfo,
1230) -> Result<PushMessage, ProtocolError> {
1231    match info.obj_type {
1232        ObjectType::Redaction => redaction_push_message(repo, info),
1233        ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1234        obj_type => Err(ProtocolError::InvalidState(format!(
1235            "{obj_type:?} is not an out-of-pack sidecar object"
1236        ))),
1237    }
1238}
1239
1240fn state_visibility_push_message(
1241    repo: &Repository,
1242    info: wire::ObjectInfo,
1243) -> Result<PushMessage, ProtocolError> {
1244    let wire::ObjectId::ChangeId(state) = info.id else {
1245        return Err(ProtocolError::InvalidState(
1246            "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1247        ));
1248    };
1249    let state_id = state.to_string_full();
1250    let bytes = repo
1251        .get_state_visibility_bytes_for_state(&state)
1252        .map_err(|err| {
1253            ProtocolError::InvalidState(format!(
1254                "load state-visibility sidecar for {}: {err}",
1255                state_id
1256            ))
1257        })?
1258        .ok_or_else(|| {
1259            ProtocolError::InvalidState(format!(
1260                "server wants state visibility for state {} but sender has no sidecar",
1261                state_id
1262            ))
1263        })?;
1264    Ok(PushMessage {
1265        body: Some(push_message::Body::StateVisibility(
1266            StateVisibilityTransfer {
1267                state_id,
1268                state_visibility_blob: bytes.into(),
1269            },
1270        )),
1271    })
1272}
1273
1274fn load_thread_metadata(
1275    repo: &Repository,
1276    target_thread: &str,
1277    local_state: ChangeId,
1278) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1279    let thread_manager = ThreadManager::new(repo.heddle_dir());
1280    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1281}
1282
1283fn plan_pull_wants(
1284    repo: &Repository,
1285    remote_state: &ChangeId,
1286    full_closure_available: bool,
1287    objects_to_fetch: Vec<ObjectDescriptor>,
1288    allow_partial_fetch: bool,
1289) -> Result<PullWantPlan, ProtocolError> {
1290    if full_closure_available {
1291        return Ok(PullWantPlan {
1292            wants: Vec::new(),
1293            wanted_types: HashMap::new(),
1294            want_full_closure: true,
1295        });
1296    }
1297    let request_full_closure =
1298        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1299    let mut wants = Vec::with_capacity(objects_to_fetch.len());
1300    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1301
1302    for descriptor in objects_to_fetch {
1303        let info = parse_descriptor_to_info(descriptor)?;
1304        let pack_id = match &info.id {
1305            wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1306            wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1307        };
1308        let include = if request_full_closure {
1309            true
1310        } else {
1311            let has = wire::has_object(repo.store(), &info)?;
1312            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1313        };
1314
1315        if include {
1316            record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1317            wants.push(object_descriptor_with_status(
1318                &info,
1319                ObjectAvailabilityStatus::Missing,
1320                "requested by client",
1321            ));
1322        }
1323    }
1324
1325    Ok(PullWantPlan {
1326        wants,
1327        wanted_types,
1328        want_full_closure: false,
1329    })
1330}
1331
1332fn supports_compact_full_pull(
1333    repo: &Repository,
1334    allow_partial_fetch: bool,
1335    exclude_states: &[ChangeId],
1336) -> Result<bool, ProtocolError> {
1337    if allow_partial_fetch || !exclude_states.is_empty() {
1338        return Ok(false);
1339    }
1340    repo_looks_fresh(repo)
1341}
1342
1343/// For a bare `pull` (no explicit `--local-thread`), determine which locally
1344/// held head — if any — is safe to advertise to the server as an
1345/// `exclude_states` entry so the server prunes its closure to the delta.
1346///
1347/// Advertising state S asserts "I already hold S's FULL object closure
1348/// locally." If we advertise a head whose closure we do NOT fully have, the
1349/// server omits those objects and we silently end up with an incomplete repo.
1350/// The server trusts this assertion blindly, so the entire correctness burden
1351/// is here. We therefore advertise a head ONLY when:
1352///
1353/// 1. A target-state override is not in play (the override drives the want
1354///    plan directly; advertising the thread head would be unrelated).
1355/// 2. The local repo holds no recorded missing blobs (a partial/lazy clone
1356///    can hold a state's metadata while its blobs were never fetched — never
1357///    advertise such a head).
1358/// 3. The thread we're about to update (`remote_thread`) resolves to a local
1359///    head whose ENTIRE object closure is present locally — proven by walking
1360///    it with `enumerate_state_closure`, which errors `ObjectNotFound` on the
1361///    first absent state/tree/blob.
1362///
1363/// When any check fails we return `None` and the caller falls back to the
1364/// (correct, just slower) empty-exclude full pull. Correctness > speed.
1365fn locally_complete_pull_head(
1366    repo: &Repository,
1367    remote_thread: &str,
1368    target_state: Option<ChangeId>,
1369) -> Result<Option<ChangeId>, ProtocolError> {
1370    locally_complete_thread_head(repo, remote_thread, target_state)
1371}
1372
1373/// Same completeness gate for an explicit `--local-thread`: the user named the
1374/// local thread whose head should be advertised as already-held. The cardinal
1375/// risk is identical to the bare path — advertising a head whose closure we do
1376/// NOT fully hold makes the server prune objects we lack and silently leaves us
1377/// with an incomplete repo. A `--local-thread` pointed at a partial/lazy clone
1378/// or an interrupted prior pull is exactly that hazard, so it must clear the
1379/// same checks (no target-state override, no recorded missing blobs, full
1380/// closure present) before it may be advertised.
1381fn locally_complete_local_thread_head(
1382    repo: &Repository,
1383    local_thread: &str,
1384    target_state: Option<ChangeId>,
1385) -> Result<Option<ChangeId>, ProtocolError> {
1386    locally_complete_thread_head(repo, local_thread, target_state)
1387}
1388
1389/// Shared completeness gate: given the name of a thread whose head we are about
1390/// to advertise as an `exclude_states` entry, return that head ONLY when its
1391/// full object closure is provably present locally; otherwise `None` (caller
1392/// falls back to the correct, slower empty-exclude full pull).
1393///
1394/// Advertising state S asserts "I already hold S's FULL object closure
1395/// locally." If we advertise a head whose closure we do NOT fully have, the
1396/// server omits those objects and we silently end up with an incomplete repo.
1397/// The server trusts this assertion blindly, so the entire correctness burden
1398/// is here. We therefore advertise a head ONLY when:
1399///
1400/// 1. A target-state override is not in play (the override drives the want
1401///    plan directly; advertising the thread head would be unrelated).
1402/// 2. The local repo holds no recorded missing blobs (a partial/lazy clone
1403///    can hold a state's metadata while its blobs were never fetched — never
1404///    advertise such a head).
1405/// 3. The named thread resolves to a local head whose ENTIRE object closure is
1406///    present locally — proven by walking it with `enumerate_state_closure`,
1407///    which errors `ObjectNotFound` on the first absent state/tree/blob.
1408fn locally_complete_thread_head(
1409    repo: &Repository,
1410    thread: &str,
1411    target_state: Option<ChangeId>,
1412) -> Result<Option<ChangeId>, ProtocolError> {
1413    // A target-state override pulls a specific state, not the thread tip;
1414    // advertising the thread head here would not match what's being fetched.
1415    if target_state.is_some() {
1416        return Ok(None);
1417    }
1418    // A repo carrying known-missing blobs is partial/lazy: it may hold a
1419    // state's metadata while lacking its blob content. Never advertise.
1420    if !repo.missing_blobs()?.is_empty() {
1421        return Ok(None);
1422    }
1423    let Some(head) = repo.refs().get_thread(&ThreadName::from(thread))? else {
1424        // Fresh local repo (no local head for this thread) — nothing to
1425        // advertise; the server sends the full closure as before.
1426        return Ok(None);
1427    };
1428    // Prove the head's closure is fully present locally. `enumerate_state_closure`
1429    // loads every state, tree, and blob in the closure and errors `ObjectNotFound`
1430    // on the first absent object. A clean `Ok` is the completeness guarantee that
1431    // makes advertising this head safe.
1432    match wire::enumerate_state_closure(repo.store(), head) {
1433        Ok(_) => Ok(Some(head)),
1434        Err(ProtocolError::ObjectNotFound(_)) => Ok(None),
1435        Err(err) => Err(err),
1436    }
1437}
1438
1439fn should_request_full_closure(
1440    repo: &Repository,
1441    remote_state: &ChangeId,
1442    allow_partial_fetch: bool,
1443) -> Result<bool, ProtocolError> {
1444    if allow_partial_fetch || repo.store().has_state(remote_state)? {
1445        return Ok(false);
1446    }
1447    repo_looks_fresh(repo)
1448}
1449
1450fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1451    if repo.head()?.is_some() {
1452        return Ok(false);
1453    }
1454    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1455        return Ok(false);
1456    }
1457    Ok(repo.missing_blobs()?.is_empty())
1458}
1459
1460fn infer_installed_hash_object_type(
1461    repo: &Repository,
1462    hash: &ContentHash,
1463) -> Result<ObjectType, ProtocolError> {
1464    let store = repo.store();
1465    if store.get_tree(hash)?.is_some() {
1466        return Ok(ObjectType::Tree);
1467    }
1468    if store
1469        .get_action(&objects::object::ActionId::from_hash(*hash))?
1470        .is_some()
1471    {
1472        return Ok(ObjectType::Action);
1473    }
1474    Ok(ObjectType::Blob)
1475}
1476
1477fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1478    const HEADER: &str = "heddle-markers-v1\n";
1479    if checkpoint.is_empty() {
1480        return Ok(false);
1481    }
1482    let payload = std::str::from_utf8(checkpoint)
1483        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1484    let Some(lines) = payload.strip_prefix(HEADER) else {
1485        return Ok(false);
1486    };
1487
1488    for line in lines.lines() {
1489        if line.is_empty() {
1490            continue;
1491        }
1492        let Some((name, change_id)) = line.split_once('\t') else {
1493            return Err(ProtocolError::InvalidState(
1494                "invalid marker snapshot line".to_string(),
1495            ));
1496        };
1497        let change_id = ChangeId::parse(change_id)
1498            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1499        if !repo.store().has_state(&change_id)? {
1500            continue;
1501        }
1502        let name = MarkerName::from(name);
1503        match repo.refs().get_marker(&name)? {
1504            Some(existing) if existing == change_id => {}
1505            Some(existing) => repo.refs().set_marker_cas(
1506                &name,
1507                refs::RefExpectation::Value(existing),
1508                &change_id,
1509            )?,
1510            None => repo.refs().create_marker(&name, &change_id)?,
1511        }
1512    }
1513
1514    Ok(true)
1515}
1516
1517fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1518    if s.is_empty() {
1519        return Vec::new();
1520    }
1521    objects::object::ChangeId::parse(s)
1522        .map(|id| id.as_bytes().to_vec())
1523        .unwrap_or_default()
1524}
1525
1526fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1527    ThreadMetadata {
1528        name: metadata.thread.clone(),
1529        target_thread: metadata.target_thread.clone(),
1530        parent_thread: metadata.parent_thread.clone(),
1531        task: metadata.task.clone(),
1532        thread_mode: metadata.mode.to_string(),
1533        thread_state: metadata.state.to_string(),
1534        freshness: metadata.freshness.to_string(),
1535        base_state: change_id_string_to_bytes(&metadata.base_state),
1536        base_root: change_id_string_to_bytes(&metadata.base_root),
1537        current_state: metadata
1538            .current_state
1539            .as_deref()
1540            .map(change_id_string_to_bytes),
1541        merged_state: metadata
1542            .merged_state
1543            .as_deref()
1544            .map(change_id_string_to_bytes),
1545        changed_paths: metadata.changed_paths.clone(),
1546        impact_categories: metadata
1547            .impact_categories
1548            .iter()
1549            .map(ToString::to_string)
1550            .collect(),
1551        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1552        promotion_suggested: metadata.promotion_suggested,
1553        verification_summary: Some(ThreadVerificationSummary {
1554            tests_passed: metadata.verification_summary.tests_passed,
1555            tests_failed: metadata
1556                .verification_summary
1557                .tests_failed
1558                .unwrap_or_default(),
1559            coverage_pct: metadata.verification_summary.coverage_pct,
1560            lint_warnings: metadata.verification_summary.lint_warnings,
1561        }),
1562        confidence_summary: Some(ThreadConfidenceSummary {
1563            value: metadata.confidence_summary.value,
1564            band: metadata
1565                .confidence_summary
1566                .band
1567                .as_ref()
1568                .map(ToString::to_string),
1569        }),
1570        integration_policy_result: Some(ThreadIntegrationPolicy {
1571            status: metadata
1572                .integration_policy_result
1573                .status
1574                .clone()
1575                .unwrap_or_default(),
1576            reason: metadata
1577                .integration_policy_result
1578                .reason
1579                .clone()
1580                .unwrap_or_default(),
1581        }),
1582        created_at: Some(prost_types::Timestamp {
1583            seconds: metadata.created_at.timestamp(),
1584            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1585        }),
1586        updated_at: Some(prost_types::Timestamp {
1587            seconds: metadata.updated_at.timestamp(),
1588            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1589        }),
1590    }
1591}
1592
1593struct PullExchange {
1594    result: PullComplete,
1595    object_count: usize,
1596    profile: PullProfile,
1597}
1598
1599fn mark_missing_blobs_for_state(
1600    repo: &Repository,
1601    state_id: ChangeId,
1602) -> Result<(), ProtocolError> {
1603    let state = repo
1604        .store()
1605        .get_state(&state_id)?
1606        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1607    let mut missing = collect_missing_blobs(repo, &state.tree)?;
1608    if let Some(context_root) = state.context.as_ref() {
1609        missing.extend(collect_missing_blobs(repo, context_root)?);
1610    }
1611    if let Some(discussions_blob) = state.discussions.as_ref()
1612        && !repo.store().has_blob(discussions_blob)?
1613    {
1614        missing.push(*discussions_blob);
1615    }
1616    missing
1617        .into_iter()
1618        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1619}
1620
1621fn clear_missing_blobs_for_state(
1622    repo: &Repository,
1623    state_id: ChangeId,
1624) -> Result<(), ProtocolError> {
1625    let state = repo
1626        .store()
1627        .get_state(&state_id)?
1628        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1629    let mut missing = collect_missing_blobs(repo, &state.tree)?;
1630    if let Some(context_root) = state.context.as_ref() {
1631        missing.extend(collect_missing_blobs(repo, context_root)?);
1632    }
1633    if let Some(discussions_blob) = state.discussions.as_ref() {
1634        missing.push(*discussions_blob);
1635    }
1636    missing
1637        .into_iter()
1638        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1639}
1640
1641fn collect_missing_blobs(
1642    repo: &Repository,
1643    tree_hash: &ContentHash,
1644) -> Result<Vec<ContentHash>, ProtocolError> {
1645    let mut missing = Vec::new();
1646    collect_missing_blobs_recursive(repo, tree_hash, &mut missing)?;
1647    Ok(missing)
1648}
1649
1650fn collect_missing_blobs_recursive(
1651    repo: &Repository,
1652    tree_hash: &ContentHash,
1653    missing: &mut Vec<ContentHash>,
1654) -> Result<(), ProtocolError> {
1655    let Some(tree) = repo.store().get_tree(tree_hash).map_err(|err| {
1656        ProtocolError::InvalidState(format!(
1657            "load tree {} while collecting lazy hydration missing blobs: {err}",
1658            tree_hash.to_hex()
1659        ))
1660    })?
1661    else {
1662        return Ok(());
1663    };
1664    for entry in tree.entries() {
1665        match entry.entry_type {
1666            objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1667                if !repo.store().has_blob(&entry.hash).map_err(|err| {
1668                    ProtocolError::InvalidState(format!(
1669                        "check blob {} while collecting lazy hydration missing blobs: {err}",
1670                        entry.hash.to_hex()
1671                    ))
1672                })? {
1673                    missing.push(entry.hash);
1674                }
1675            }
1676            objects::object::EntryType::Tree => {
1677                collect_missing_blobs_recursive(repo, &entry.hash, missing)?;
1678            }
1679        }
1680    }
1681    Ok(())
1682}
1683
1684fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1685    match repo.missing_blobs() {
1686        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1687        Ok(_) => PartialFetchStatus::Disabled as i32,
1688        Err(_) => PartialFetchStatus::Unspecified as i32,
1689    }
1690}
1691
1692fn pull_transfer_id(
1693    repo_path: &str,
1694    remote_thread: &str,
1695    local_thread: Option<&str>,
1696    depth: Option<u32>,
1697    target_state: Option<ChangeId>,
1698) -> String {
1699    format!(
1700        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1701        local_thread.unwrap_or_default(),
1702        target_state
1703            .map(|value| value.to_string_full())
1704            .unwrap_or_default()
1705    )
1706}
1707
1708fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1709    format!(
1710        "push:{repo_path}:{}:{target_thread}",
1711        local_state.to_string_full()
1712    )
1713}
1714
1715fn native_revision_address(change_id: ChangeId) -> String {
1716    RevisionAddress::heddle(change_id).to_string()
1717}
1718
1719fn git_revision_address(commit_oid: &GitObjectId) -> String {
1720    RevisionAddress::git_commit(commit_oid.to_hex()).to_string()
1721}
1722
1723fn build_git_lane_push_plan(
1724    repo: &Repository,
1725    local_state: ChangeId,
1726    target_thread: &str,
1727    chunk_size: usize,
1728) -> Result<GitLanePushPlan, ProtocolError> {
1729    if repo.capability() != RepositoryCapability::GitOverlay {
1730        return Err(ProtocolError::InvalidState(
1731            "Git lane pushes require a git-overlay repository".to_string(),
1732        ));
1733    }
1734    let checkpoint = repo
1735        .latest_git_checkpoint_for_change(&local_state)
1736        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1737        .ok_or_else(|| {
1738            ProtocolError::InvalidState(format!(
1739                "state {} has no Git checkpoint; run `heddle checkpoint` before pushing this git-overlay spool",
1740                local_state.short()
1741            ))
1742        })?;
1743    let git_repo = repo
1744        .git_overlay_sley_repository()
1745        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1746        .ok_or_else(|| {
1747            ProtocolError::InvalidState("git-overlay repository has no Git store".to_string())
1748        })?;
1749    let commit_oid = GitObjectId::from_hex(git_repo.object_format(), &checkpoint.git_commit)
1750        .map_err(|err| {
1751            ProtocolError::InvalidState(format!(
1752                "checkpoint {} has invalid Git commit oid: {err}",
1753                checkpoint.git_commit
1754            ))
1755        })?;
1756    git_repo.read_commit(&commit_oid).map_err(|err| {
1757        ProtocolError::InvalidState(format!(
1758            "checkpoint {} is not a readable Git commit: {err}",
1759            checkpoint.git_commit
1760        ))
1761    })?;
1762
1763    let pack = build_git_lane_pack_plan(&git_repo, commit_oid, chunk_size)?;
1764    let ref_update = git_ref_update_message(local_state, target_thread, commit_oid, &checkpoint)?;
1765
1766    Ok(GitLanePushPlan {
1767        local_revision_address: git_revision_address(&commit_oid),
1768        pack,
1769        ref_update,
1770    })
1771}
1772
1773fn build_git_lane_pack_plan(
1774    git_repo: &SleyRepository,
1775    root: GitObjectId,
1776    chunk_size: usize,
1777) -> Result<GitPackPushPlan, ProtocolError> {
1778    let objects = git_repo.objects();
1779    let mut sink = io::sink();
1780    let pack = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
1781        objects.as_ref(),
1782        git_repo.object_format(),
1783        [root],
1784        &HashSet::new(),
1785        &mut sink,
1786    )
1787    .map_err(|err| {
1788        ProtocolError::InvalidState(format!(
1789            "plan reachable Git pack stream for {}: {err}",
1790            root.to_hex()
1791        ))
1792    })?
1793    .ok_or_else(|| {
1794        ProtocolError::InvalidState(format!(
1795            "checkpoint {} did not produce a reachable Git pack",
1796            root.to_hex()
1797        ))
1798    })?;
1799    if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
1800        return Err(ProtocolError::InvalidState(format!(
1801            "Git pack exceeds maximum transfer size of {} bytes",
1802            wire::MAX_RECEIVED_GIT_PACK_SIZE
1803        )));
1804    }
1805    let chunk_size = chunk_size.max(1) as u64;
1806    let chunk_count = pack.pack_size.div_ceil(chunk_size);
1807    if chunk_count > u32::MAX as u64 {
1808        return Err(ProtocolError::InvalidState(
1809            "Git pack chunk count exceeds u32".to_string(),
1810        ));
1811    }
1812    Ok(GitPackPushPlan {
1813        transfer_id: format!("git-pack:{}", root.to_hex()),
1814        pack_id: pack.checksum.as_bytes().to_vec(),
1815        pack_size: pack.pack_size,
1816        root,
1817        git_repo: git_repo.clone(),
1818    })
1819}
1820
1821async fn send_git_pack_streaming_messages(
1822    tx: &mpsc::Sender<PushMessage>,
1823    pack: &GitPackPushPlan,
1824    chunk_size: usize,
1825) -> Result<(), ProtocolError> {
1826    let tx = tx.clone();
1827    let pack = pack.clone();
1828    tokio::task::spawn_blocking(move || stream_git_pack_messages_blocking(tx, pack, chunk_size))
1829        .await
1830        .map_err(|err| {
1831            ProtocolError::InvalidState(format!("Git pack streaming task failed: {err}"))
1832        })?
1833}
1834
1835fn stream_git_pack_messages_blocking(
1836    tx: mpsc::Sender<PushMessage>,
1837    pack: GitPackPushPlan,
1838    chunk_size: usize,
1839) -> Result<(), ProtocolError> {
1840    let objects = pack.git_repo.objects();
1841    let mut writer = GitPackPushMessageWriter::new(
1842        tx,
1843        pack.transfer_id.clone(),
1844        pack.pack_id.clone(),
1845        pack.pack_size,
1846        chunk_size,
1847    );
1848    let summary = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
1849        objects.as_ref(),
1850        pack.git_repo.object_format(),
1851        [pack.root],
1852        &HashSet::new(),
1853        &mut writer,
1854    )
1855    .map_err(|err| {
1856        ProtocolError::InvalidState(format!(
1857            "stream reachable Git pack for {}: {err}",
1858            pack.root.to_hex()
1859        ))
1860    })?
1861    .ok_or_else(|| {
1862        ProtocolError::InvalidState(format!(
1863            "checkpoint {} did not produce a reachable Git pack",
1864            pack.root.to_hex()
1865        ))
1866    })?;
1867    writer.finish()?;
1868    if summary.pack_size != pack.pack_size || summary.checksum.as_bytes() != pack.pack_id.as_slice()
1869    {
1870        return Err(ProtocolError::InvalidState(format!(
1871            "Git pack stream changed while sending {}; expected {} bytes/{}, got {} bytes/{}",
1872            pack.root.to_hex(),
1873            pack.pack_size,
1874            hex::encode(&pack.pack_id),
1875            summary.pack_size,
1876            summary.checksum.to_hex()
1877        )));
1878    }
1879    Ok(())
1880}
1881
1882struct GitPackPushMessageWriter {
1883    tx: mpsc::Sender<PushMessage>,
1884    transfer_id: String,
1885    pack_id: Vec<u8>,
1886    pack_size: u64,
1887    chunk_size: usize,
1888    buffer: Vec<u8>,
1889    offset: u64,
1890    chunk_index: u32,
1891}
1892
1893impl GitPackPushMessageWriter {
1894    fn new(
1895        tx: mpsc::Sender<PushMessage>,
1896        transfer_id: String,
1897        pack_id: Vec<u8>,
1898        pack_size: u64,
1899        chunk_size: usize,
1900    ) -> Self {
1901        let chunk_size = chunk_size.max(1);
1902        Self {
1903            tx,
1904            transfer_id,
1905            pack_id,
1906            pack_size,
1907            chunk_size,
1908            buffer: Vec::with_capacity(chunk_size),
1909            offset: 0,
1910            chunk_index: 0,
1911        }
1912    }
1913
1914    fn send_buffer(&mut self) -> io::Result<()> {
1915        if self.buffer.is_empty() {
1916            return Ok(());
1917        }
1918        let chunk = std::mem::take(&mut self.buffer);
1919        let next_offset = self.offset.checked_add(chunk.len() as u64).ok_or_else(|| {
1920            io::Error::new(io::ErrorKind::InvalidData, "Git pack offset overflow")
1921        })?;
1922        if next_offset > self.pack_size {
1923            return Err(io::Error::new(
1924                io::ErrorKind::InvalidData,
1925                format!(
1926                    "Git pack stream exceeded planned size {}; got at least {}",
1927                    self.pack_size, next_offset
1928                ),
1929            ));
1930        }
1931        let is_final_chunk = next_offset == self.pack_size;
1932        self.tx
1933            .blocking_send(git_lane_push_message(git_lane_transfer::Body::Pack(
1934                GitPackTransfer {
1935                    transfer_id: self.transfer_id.clone(),
1936                    offset: self.offset,
1937                    chunk_index: self.chunk_index,
1938                    is_final_chunk,
1939                    pack_size: self.pack_size,
1940                    pack_chunk: chunk.into(),
1941                    pack_id: self.pack_id.clone().into(),
1942                },
1943            )))
1944            .map_err(|_| {
1945                io::Error::new(io::ErrorKind::BrokenPipe, "push stream closed unexpectedly")
1946            })?;
1947        self.offset = next_offset;
1948        self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
1949            io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk index overflow")
1950        })?;
1951        Ok(())
1952    }
1953
1954    fn finish(mut self) -> Result<(), ProtocolError> {
1955        self.send_buffer().map_err(|err| {
1956            ProtocolError::InvalidState(format!("send final Git pack chunk: {err}"))
1957        })?;
1958        if self.offset != self.pack_size {
1959            return Err(ProtocolError::InvalidState(format!(
1960                "Git pack stream length mismatch: expected {}, got {}",
1961                self.pack_size, self.offset
1962            )));
1963        }
1964        Ok(())
1965    }
1966}
1967
1968impl Write for GitPackPushMessageWriter {
1969    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
1970        let written = buf.len();
1971        while !buf.is_empty() {
1972            if self.offset == self.pack_size {
1973                return Err(io::Error::new(
1974                    io::ErrorKind::InvalidData,
1975                    format!("Git pack stream wrote past planned size {}", self.pack_size),
1976                ));
1977            }
1978            let capacity = self
1979                .chunk_size
1980                .checked_sub(self.buffer.len())
1981                .ok_or_else(|| {
1982                    io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk buffer overflow")
1983                })?;
1984            let take = capacity.min(buf.len());
1985            self.buffer.extend_from_slice(&buf[..take]);
1986            buf = &buf[take..];
1987            if self.buffer.len() == self.chunk_size {
1988                self.send_buffer()?;
1989            }
1990        }
1991        Ok(written)
1992    }
1993
1994    fn flush(&mut self) -> io::Result<()> {
1995        Ok(())
1996    }
1997}
1998
1999fn git_ref_update_message(
2000    local_state: ChangeId,
2001    target_thread: &str,
2002    commit_oid: GitObjectId,
2003    checkpoint: &GitCheckpointRecord,
2004) -> Result<PushMessage, ProtocolError> {
2005    let metadata_json = serde_json::json!({
2006        "source": "heddle-checkpoint",
2007        "summary": checkpoint.summary,
2008        "committed_at": checkpoint.committed_at,
2009    })
2010    .to_string();
2011    Ok(git_lane_push_message(git_lane_transfer::Body::RefUpdate(
2012        GitRefUpdateTransfer {
2013            name: format!("refs/heads/{target_thread}"),
2014            kind: GitRefKind::Branch as i32,
2015            target_oid: commit_oid.as_bytes().to_vec().into(),
2016            peeled_oid: Vec::new().into(),
2017            expected_missing: false,
2018            expected_target_oid: Vec::new().into(),
2019            checkpoint: Some(GitCheckpointTransfer {
2020                heddle_change_id: local_state.as_bytes().to_vec().into(),
2021                git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2022                thread: target_thread.to_string(),
2023                metadata_json,
2024            }),
2025        },
2026    )))
2027}
2028
2029fn apply_git_ref_expectation(
2030    message: &mut PushMessage,
2031    remote_revision_address: &str,
2032) -> Result<(), ProtocolError> {
2033    let expectation = parse_git_ref_expectation(remote_revision_address)?;
2034    let Some(push_message::Body::GitLane(GitLaneTransfer {
2035        body: Some(git_lane_transfer::Body::RefUpdate(update)),
2036    })) = message.body.as_mut()
2037    else {
2038        return Err(ProtocolError::InvalidState(
2039            "Git lane push plan missing ref update message".to_string(),
2040        ));
2041    };
2042    match &expectation {
2043        GitRefRemoteExpectation::Missing => {
2044            update.expected_missing = true;
2045            update.expected_target_oid = Vec::new().into();
2046        }
2047        GitRefRemoteExpectation::Value(oid) => {
2048            update.expected_missing = false;
2049            update.expected_target_oid = oid.clone().into();
2050        }
2051    }
2052    Ok(())
2053}
2054
2055#[derive(Clone, Debug, PartialEq, Eq)]
2056enum GitRefRemoteExpectation {
2057    Missing,
2058    Value(Vec<u8>),
2059}
2060
2061fn parse_git_ref_expectation(
2062    remote_revision_address: &str,
2063) -> Result<GitRefRemoteExpectation, ProtocolError> {
2064    if remote_revision_address.is_empty() || remote_revision_address.starts_with("heddle:") {
2065        return Ok(GitRefRemoteExpectation::Missing);
2066    }
2067    let Some(hex_oid) = remote_revision_address.strip_prefix("git:") else {
2068        return Err(ProtocolError::InvalidState(format!(
2069            "server returned unsupported remote_revision_address {remote_revision_address:?}"
2070        )));
2071    };
2072    let oid = hex::decode(hex_oid).map_err(|err| {
2073        ProtocolError::InvalidState(format!(
2074            "server returned invalid Git remote_revision_address: {err}"
2075        ))
2076    })?;
2077    match oid.len() {
2078        20 | 32 => Ok(GitRefRemoteExpectation::Value(oid)),
2079        len => Err(ProtocolError::InvalidState(format!(
2080            "server returned Git remote_revision_address with {len} bytes; expected SHA-1 or SHA-256"
2081        ))),
2082    }
2083}
2084
2085fn git_lane_push_message(body: git_lane_transfer::Body) -> PushMessage {
2086    PushMessage {
2087        body: Some(push_message::Body::GitLane(GitLaneTransfer {
2088            body: Some(body),
2089        })),
2090    }
2091}
2092
2093fn git_lane_transfer_size(transfer: &GitLaneTransfer) -> usize {
2094    match transfer.body.as_ref() {
2095        Some(git_lane_transfer::Body::Pack(pack)) => pack.pack_chunk.len(),
2096        Some(git_lane_transfer::Body::RefUpdate(update)) => update
2097            .target_oid
2098            .len()
2099            .saturating_add(update.peeled_oid.len())
2100            .saturating_add(update.expected_target_oid.len())
2101            .saturating_add(
2102                update
2103                    .checkpoint
2104                    .as_ref()
2105                    .map(git_checkpoint_transfer_size)
2106                    .unwrap_or_default(),
2107            ),
2108        Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2109            git_checkpoint_transfer_size(checkpoint)
2110        }
2111        None => 0,
2112    }
2113}
2114
2115fn git_checkpoint_transfer_size(checkpoint: &GitCheckpointTransfer) -> usize {
2116    checkpoint
2117        .heddle_change_id
2118        .len()
2119        .saturating_add(checkpoint.git_commit_oid.len())
2120        .saturating_add(checkpoint.thread.len())
2121        .saturating_add(checkpoint.metadata_json.len())
2122}
2123
2124#[derive(Default)]
2125struct GitPackPullInstallState {
2126    active: Option<GitPackPullInstall>,
2127}
2128
2129impl GitPackPullInstallState {
2130    fn ensure_idle(&self) -> Result<(), ProtocolError> {
2131        if self.active.is_none() {
2132            Ok(())
2133        } else {
2134            Err(ProtocolError::InvalidState(
2135                "Git pack transfer ended before final chunk".to_string(),
2136            ))
2137        }
2138    }
2139
2140    fn receive_chunk(
2141        &mut self,
2142        git_repo: &SleyRepository,
2143        pack: GitPackTransfer,
2144    ) -> Result<(), ProtocolError> {
2145        if pack.transfer_id.is_empty() {
2146            return Err(ProtocolError::InvalidState(
2147                "Git pack transfer_id is required".to_string(),
2148            ));
2149        }
2150        if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
2151            return Err(ProtocolError::InvalidState(format!(
2152                "Git pack exceeds maximum transfer size of {} bytes",
2153                wire::MAX_RECEIVED_GIT_PACK_SIZE
2154            )));
2155        }
2156        if pack.pack_chunk.is_empty() {
2157            return Err(ProtocolError::InvalidState(
2158                "Git pack chunk must not be empty".to_string(),
2159            ));
2160        }
2161        let pack_id =
2162            GitObjectId::from_raw(git_repo.object_format(), &pack.pack_id).map_err(|err| {
2163                ProtocolError::InvalidState(format!("GitPackTransfer.pack_id: {err}"))
2164            })?;
2165        if self.active.is_none() {
2166            if pack.offset != 0 {
2167                return Err(ProtocolError::InvalidState(format!(
2168                    "Git pack offset mismatch: expected 0, got {}",
2169                    pack.offset
2170                )));
2171            }
2172            if pack.chunk_index != 0 {
2173                return Err(ProtocolError::InvalidState(format!(
2174                    "Git pack chunk index mismatch: expected 0, got {}",
2175                    pack.chunk_index
2176                )));
2177            }
2178            let writer = git_repo
2179                .objects()
2180                .begin_raw_pack_install(pack_id, pack.pack_size)
2181                .map_err(|err| {
2182                    ProtocolError::InvalidState(format!("begin Git pack install: {err}"))
2183                })?;
2184            self.active = Some(GitPackPullInstall {
2185                transfer_id: pack.transfer_id.clone(),
2186                pack_id,
2187                pack_size: pack.pack_size,
2188                next_offset: 0,
2189                next_chunk_index: 0,
2190                writer,
2191            });
2192        }
2193
2194        let active = self.active.as_mut().ok_or_else(|| {
2195            ProtocolError::InvalidState("Git pack install not active".to_string())
2196        })?;
2197        active.receive_chunk(&pack, pack_id)?;
2198        if pack.is_final_chunk {
2199            let active = self.active.take().ok_or_else(|| {
2200                ProtocolError::InvalidState("Git pack install not active".to_string())
2201            })?;
2202            active.finish()?;
2203            git_repo.refresh_objects();
2204        }
2205        Ok(())
2206    }
2207}
2208
2209struct GitPackPullInstall {
2210    transfer_id: String,
2211    pack_id: GitObjectId,
2212    pack_size: u64,
2213    next_offset: u64,
2214    next_chunk_index: u32,
2215    writer: sley::plumbing::sley_odb::RawPackStreamingInstall,
2216}
2217
2218impl GitPackPullInstall {
2219    fn receive_chunk(
2220        &mut self,
2221        pack: &GitPackTransfer,
2222        pack_id: GitObjectId,
2223    ) -> Result<(), ProtocolError> {
2224        if self.transfer_id != pack.transfer_id {
2225            return Err(ProtocolError::InvalidState(format!(
2226                "Git pack transfer id changed from {:?} to {:?}",
2227                self.transfer_id, pack.transfer_id
2228            )));
2229        }
2230        if self.pack_id != pack_id {
2231            return Err(ProtocolError::InvalidState(
2232                "Git pack id changed during transfer".to_string(),
2233            ));
2234        }
2235        if self.pack_size != pack.pack_size {
2236            return Err(ProtocolError::InvalidState(
2237                "Git pack size changed during transfer".to_string(),
2238            ));
2239        }
2240        if pack.offset != self.next_offset {
2241            return Err(ProtocolError::InvalidState(format!(
2242                "Git pack offset mismatch: expected {}, got {}",
2243                self.next_offset, pack.offset
2244            )));
2245        }
2246        if pack.chunk_index != self.next_chunk_index {
2247            return Err(ProtocolError::InvalidState(format!(
2248                "Git pack chunk index mismatch: expected {}, got {}",
2249                self.next_chunk_index, pack.chunk_index
2250            )));
2251        }
2252        let chunk_len = u64::try_from(pack.pack_chunk.len()).map_err(|_| {
2253            ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
2254        })?;
2255        let next_offset = self
2256            .next_offset
2257            .checked_add(chunk_len)
2258            .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
2259        if next_offset > self.pack_size {
2260            return Err(ProtocolError::InvalidState(
2261                "Git pack chunk exceeds declared pack size".to_string(),
2262            ));
2263        }
2264        self.writer.write_all(&pack.pack_chunk).map_err(|err| {
2265            ProtocolError::InvalidState(format!("write streamed Git pack chunk: {err}"))
2266        })?;
2267        self.next_offset = next_offset;
2268        self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
2269            ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
2270        })?;
2271        if pack.is_final_chunk {
2272            if self.next_offset != self.pack_size {
2273                return Err(ProtocolError::InvalidState(format!(
2274                    "Git pack final size mismatch: declared {}, received {}",
2275                    self.pack_size, self.next_offset
2276                )));
2277            }
2278        } else if self.next_offset == self.pack_size {
2279            return Err(ProtocolError::InvalidState(
2280                "Git pack reached declared size without final chunk marker".to_string(),
2281            ));
2282        }
2283        Ok(())
2284    }
2285
2286    fn finish(self) -> Result<(), ProtocolError> {
2287        self.writer
2288            .finish()
2289            .map_err(|err| ProtocolError::InvalidState(format!("install Git pack: {err}")))?;
2290        Ok(())
2291    }
2292}
2293
2294fn accept_git_lane_pull_transfer(
2295    repo: &Repository,
2296    git_repo: &mut Option<SleyRepository>,
2297    git_pack_state: &mut GitPackPullInstallState,
2298    transfer: GitLaneTransfer,
2299) -> Result<(), ProtocolError> {
2300    if repo.capability() != RepositoryCapability::GitOverlay {
2301        return Err(ProtocolError::InvalidState(format!(
2302            "received git-lane pull transfer for non-GitOverlay repository (capability {:?})",
2303            repo.capability()
2304        )));
2305    }
2306    match transfer.body {
2307        Some(git_lane_transfer::Body::Pack(pack)) => {
2308            accept_git_lane_pack(repo, git_repo, git_pack_state, pack)
2309        }
2310        Some(git_lane_transfer::Body::RefUpdate(update)) => {
2311            accept_git_lane_ref_update(repo, git_repo, update)
2312        }
2313        Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2314            record_git_lane_checkpoint(repo, git_lane_sley_repository(repo, git_repo)?, checkpoint)
2315        }
2316        None => Err(ProtocolError::InvalidState(
2317            "GitLaneTransfer body is required".to_string(),
2318        )),
2319    }
2320}
2321
2322fn accept_git_lane_pack(
2323    repo: &Repository,
2324    git_repo: &mut Option<SleyRepository>,
2325    git_pack_state: &mut GitPackPullInstallState,
2326    pack: GitPackTransfer,
2327) -> Result<(), ProtocolError> {
2328    let git_repo = git_lane_sley_repository(repo, git_repo)?;
2329    git_pack_state.receive_chunk(git_repo, pack)?;
2330    Ok(())
2331}
2332
2333fn accept_git_lane_ref_update(
2334    repo: &Repository,
2335    git_repo: &mut Option<SleyRepository>,
2336    update: GitRefUpdateTransfer,
2337) -> Result<(), ProtocolError> {
2338    let git_repo = git_lane_sley_repository(repo, git_repo)?;
2339    let target = git_oid_from_bytes(
2340        git_repo,
2341        "GitRefUpdateTransfer.target_oid",
2342        &update.target_oid,
2343    )?;
2344    git_repo.read_commit(&target).map_err(|err| {
2345        ProtocolError::InvalidState(format!(
2346            "Git ref {} target commit {} is not present after pack receive: {err}",
2347            update.name,
2348            target.to_hex()
2349        ))
2350    })?;
2351    let refs = git_repo.references();
2352    let mut tx = refs.transaction();
2353    tx.update_to(
2354        update.name.clone(),
2355        ReferenceTarget::Direct(target),
2356        RefPrecondition::Any,
2357        None,
2358    );
2359    tx.commit().map_err(|err| {
2360        ProtocolError::InvalidState(format!("update Git ref {}: {err}", update.name))
2361    })?;
2362    if let Some(checkpoint) = update.checkpoint {
2363        record_git_lane_checkpoint(repo, git_repo, checkpoint)?;
2364    }
2365    Ok(())
2366}
2367
2368fn record_git_lane_checkpoint(
2369    repo: &Repository,
2370    git_repo: &SleyRepository,
2371    checkpoint: GitCheckpointTransfer,
2372) -> Result<(), ProtocolError> {
2373    let state = ChangeId::try_from_slice(&checkpoint.heddle_change_id)
2374        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2375    let commit_oid = git_oid_from_bytes(
2376        git_repo,
2377        "GitCheckpointTransfer.git_commit_oid",
2378        &checkpoint.git_commit_oid,
2379    )?;
2380    let commit_hex = commit_oid.to_hex();
2381    if repo
2382        .latest_git_checkpoint_for_change(&state)
2383        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2384        .is_some_and(|record| record.git_commit == commit_hex)
2385    {
2386        return Ok(());
2387    }
2388    let summary = serde_json::from_str::<serde_json::Value>(&checkpoint.metadata_json)
2389        .ok()
2390        .and_then(|metadata| {
2391            metadata
2392                .get("summary")
2393                .and_then(serde_json::Value::as_str)
2394                .map(str::to_string)
2395        })
2396        .unwrap_or_else(|| "pulled Git checkpoint".to_string());
2397    repo.record_git_checkpoint(&state, commit_hex, summary)
2398        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2399    Ok(())
2400}
2401
2402fn git_lane_sley_repository<'a>(
2403    repo: &Repository,
2404    git_repo: &'a mut Option<SleyRepository>,
2405) -> Result<&'a SleyRepository, ProtocolError> {
2406    if git_repo.is_none() {
2407        *git_repo = Some(
2408            repo.git_overlay_sley_repository()
2409                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2410                .ok_or_else(|| {
2411                    ProtocolError::InvalidState(
2412                        "git-overlay repository has no Git store".to_string(),
2413                    )
2414                })?,
2415        );
2416    }
2417    match git_repo.as_ref() {
2418        Some(git_repo) => Ok(git_repo),
2419        None => Err(ProtocolError::InvalidState(
2420            "git-overlay repository has no Git store".to_string(),
2421        )),
2422    }
2423}
2424
2425fn git_oid_from_bytes(
2426    git_repo: &SleyRepository,
2427    field: &str,
2428    bytes: &[u8],
2429) -> Result<GitObjectId, ProtocolError> {
2430    GitObjectId::from_raw(git_repo.object_format(), bytes)
2431        .map_err(|err| ProtocolError::InvalidState(format!("{field}: {err}")))
2432}
2433
2434async fn send_native_pack_streaming_messages(
2435    tx: &mpsc::Sender<PushMessage>,
2436    repo: &Repository,
2437    objects: &[ObjectInfo],
2438    transfer_id: &str,
2439    chunk_size: usize,
2440    transport: &super::helpers::HostedTransportPolicy,
2441    transport_mode: TransportMode,
2442) -> Result<(), ProtocolError> {
2443    let object_count = u64::try_from(objects.len()).map_err(|_| {
2444        ProtocolError::InvalidState("native pack object count exceeds u64".to_string())
2445    })?;
2446    let mut writer = wire::NativePackStreamingWriter::new_in(repo.heddle_dir(), object_count)?;
2447    let mut pack_reader = wire::GrowingPackChunkReader::open(writer.pack_path(), chunk_size)?;
2448    let (loaded_tx, mut loaded_rx) = mpsc::channel::<(
2449        usize,
2450        Result<wire::ObjectData, ProtocolError>,
2451    )>(NATIVE_PACK_OBJECT_PREFETCH_LIMIT);
2452    let store = repo.store().clone();
2453    let object_plan = objects.to_vec();
2454    let loader = tokio::task::spawn_blocking(move || {
2455        load_native_pack_objects_parallel(store, object_plan, loaded_tx);
2456    });
2457
2458    let mut next_index = 0usize;
2459    let mut pending = BTreeMap::new();
2460    while next_index < objects.len() {
2461        let (index, object) = loaded_rx.recv().await.ok_or_else(|| {
2462            ProtocolError::InvalidState(
2463                "native pack object loader stopped before sending all objects".to_string(),
2464            )
2465        })?;
2466        pending.insert(index, object);
2467
2468        while let Some(object) = pending.remove(&next_index) {
2469            let object = object?;
2470            let should_drain = object.data.len() >= chunk_size
2471                || (next_index + 1).is_multiple_of(NATIVE_PACK_DRAIN_OBJECT_INTERVAL);
2472            writer.add_object_data(object)?;
2473            if should_drain {
2474                writer.flush_pack()?;
2475                drain_growing_native_pack_stream(
2476                    tx,
2477                    &mut pack_reader,
2478                    false,
2479                    PackStreamKind::Pack,
2480                    transfer_id,
2481                    transport,
2482                    transport_mode,
2483                )
2484                .await?;
2485            }
2486            next_index += 1;
2487        }
2488    }
2489    loader.await.map_err(|err| {
2490        ProtocolError::InvalidState(format!("native pack object loader task failed: {err}"))
2491    })?;
2492
2493    let bundle = writer.finish()?;
2494    drain_growing_native_pack_stream(
2495        tx,
2496        &mut pack_reader,
2497        true,
2498        PackStreamKind::Pack,
2499        transfer_id,
2500        transport,
2501        transport_mode,
2502    )
2503    .await?;
2504    send_native_pack_file_stream(
2505        tx,
2506        &bundle.index_path,
2507        PackStreamKind::Index,
2508        transfer_id,
2509        chunk_size,
2510        transport,
2511        transport_mode,
2512    )
2513    .await
2514}
2515
2516fn load_native_pack_objects_parallel(
2517    store: AnyStore,
2518    objects: Vec<ObjectInfo>,
2519    tx: mpsc::Sender<(usize, Result<wire::ObjectData, ProtocolError>)>,
2520) {
2521    if objects.is_empty() {
2522        return;
2523    }
2524    let worker_count = native_pack_object_load_worker_count(objects.len());
2525    let objects = Arc::new(objects);
2526    let next_index = Arc::new(AtomicUsize::new(0));
2527
2528    std::thread::scope(|scope| {
2529        for _ in 0..worker_count {
2530            let store = store.clone();
2531            let objects = Arc::clone(&objects);
2532            let next_index = Arc::clone(&next_index);
2533            let tx = tx.clone();
2534            scope.spawn(move || {
2535                loop {
2536                    let index = next_index.fetch_add(1, Ordering::Relaxed);
2537                    let Some(info) = objects.get(index) else {
2538                        break;
2539                    };
2540                    let object = wire::load_object_data(&store, &info.id, info.obj_type);
2541                    if tx.blocking_send((index, object)).is_err() {
2542                        break;
2543                    }
2544                }
2545            });
2546        }
2547    });
2548}
2549
2550fn native_pack_object_load_worker_count(object_count: usize) -> usize {
2551    let available = std::thread::available_parallelism()
2552        .map(usize::from)
2553        .unwrap_or(1);
2554    object_count
2555        .min(available)
2556        .clamp(1, NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT)
2557}
2558
2559async fn drain_growing_native_pack_stream(
2560    tx: &mpsc::Sender<PushMessage>,
2561    reader: &mut wire::GrowingPackChunkReader,
2562    final_stream: bool,
2563    stream_kind: PackStreamKind,
2564    transfer_id: &str,
2565    transport: &super::helpers::HostedTransportPolicy,
2566    transport_mode: TransportMode,
2567) -> Result<(), ProtocolError> {
2568    while let Some((offset, chunk_index, data, is_final_chunk)) =
2569        reader.next_available_chunk(final_stream)?
2570    {
2571        send_pack_chunk(
2572            tx,
2573            stream_kind,
2574            data,
2575            transfer_id,
2576            transport,
2577            transport_mode,
2578            chunk_index,
2579            offset,
2580            is_final_chunk,
2581        )
2582        .await?;
2583    }
2584    Ok(())
2585}
2586
2587async fn send_native_pack_file_stream(
2588    tx: &mpsc::Sender<PushMessage>,
2589    path: &std::path::Path,
2590    stream_kind: PackStreamKind,
2591    transfer_id: &str,
2592    chunk_size: usize,
2593    transport: &super::helpers::HostedTransportPolicy,
2594    transport_mode: TransportMode,
2595) -> Result<(), ProtocolError> {
2596    let mut reader = wire::PackFileChunkReader::open(path, chunk_size)?;
2597    while let Some((offset, chunk_index, data, is_final_chunk)) = reader.next_chunk()? {
2598        send_pack_chunk(
2599            tx,
2600            stream_kind,
2601            data,
2602            transfer_id,
2603            transport,
2604            transport_mode,
2605            chunk_index,
2606            offset,
2607            is_final_chunk,
2608        )
2609        .await?;
2610    }
2611    Ok(())
2612}
2613
2614#[allow(clippy::too_many_arguments)]
2615async fn send_pack_chunk(
2616    tx: &mpsc::Sender<PushMessage>,
2617    stream_kind: PackStreamKind,
2618    data: Vec<u8>,
2619    transfer_id: &str,
2620    transport: &super::helpers::HostedTransportPolicy,
2621    transport_mode: TransportMode,
2622    chunk_index: u32,
2623    offset: u64,
2624    is_final_chunk: bool,
2625) -> Result<(), ProtocolError> {
2626    let chunk_length = data.len().min(u32::MAX as usize) as u32;
2627    tx.send(PushMessage {
2628        body: Some(push_message::Body::Pack(PackChunk {
2629            stream_kind: stream_kind as i32,
2630            data: data.into(),
2631            transfer: Some(transport.transfer_checkpoint_with_mode(
2632                transfer_id,
2633                transport_mode,
2634                chunk_index,
2635                offset,
2636                is_final_chunk,
2637            )),
2638            chunk_length,
2639            is_final_chunk,
2640        })),
2641    })
2642    .await
2643    .map_err(|_| ProtocolError::InvalidState("push stream closed unexpectedly".to_string()))
2644}
2645
2646fn preferred_transport_mode(
2647    transport: &super::helpers::HostedTransportPolicy,
2648    object_count: usize,
2649) -> TransportMode {
2650    let _ = transport;
2651    let _ = object_count;
2652    TransportMode::NativePack
2653}
2654
2655#[cfg(test)]
2656mod tests {
2657    use chrono::{TimeZone, Utc};
2658    use cli_shared::ClientConfig;
2659    use grpc::heddle::v1::{
2660        ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
2661        TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
2662        repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
2663    };
2664    use objects::{
2665        object::{
2666            Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
2667            StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
2668        },
2669        store::ObjectStore,
2670    };
2671    use tempfile::TempDir;
2672    use tonic::{Response, Status, transport::Server};
2673    use wire::{ObjectId, ObjectInfo};
2674
2675    use super::*;
2676    use crate::grpc_hosted::helpers::{descriptor_id_from_info, to_proto_object_info};
2677
2678    fn temp_repo() -> (TempDir, Repository) {
2679        let dir = TempDir::new().expect("tempdir");
2680        let repo = Repository::init_default(dir.path()).expect("init repo");
2681        (dir, repo)
2682    }
2683
2684    /// An unseeded repo with no thread heads — the shape `heddle clone`
2685    /// creates locally before its first pull (`Repository::init`, not
2686    /// `init_default`).
2687    fn temp_repo_unseeded() -> (TempDir, Repository) {
2688        let dir = TempDir::new().expect("tempdir");
2689        let repo = Repository::init(dir.path()).expect("init repo");
2690        (dir, repo)
2691    }
2692
2693    fn redaction_info(blob: ContentHash) -> ObjectInfo {
2694        ObjectInfo {
2695            id: ObjectId::Hash(blob),
2696            obj_type: ObjectType::Redaction,
2697            size: 0,
2698            delta_base: None,
2699        }
2700    }
2701
2702    fn state_info(state: ChangeId) -> ObjectInfo {
2703        ObjectInfo {
2704            id: ObjectId::ChangeId(state),
2705            obj_type: ObjectType::State,
2706            size: 0,
2707            delta_base: None,
2708        }
2709    }
2710
2711    fn state_visibility_info(state: ChangeId) -> ObjectInfo {
2712        ObjectInfo {
2713            id: ObjectId::ChangeId(state),
2714            obj_type: ObjectType::StateVisibility,
2715            size: 0,
2716            delta_base: None,
2717        }
2718    }
2719
2720    fn sample_blob() -> ContentHash {
2721        ContentHash::from_bytes([7u8; 32])
2722    }
2723
2724    #[test]
2725    fn descriptor_id_from_info_matches_proto_encode_path() {
2726        let infos = [
2727            redaction_info(sample_blob()),
2728            state_info(ChangeId::from_bytes([3u8; 16])),
2729            state_visibility_info(ChangeId::from_bytes([9u8; 16])),
2730        ];
2731        for info in infos {
2732            assert_eq!(
2733                descriptor_id_from_info(&info),
2734                descriptor_id(&to_proto_object_info(&info)),
2735                "keying path must stay byte-identical to the throwaway-encode path",
2736            );
2737        }
2738    }
2739
2740    #[test]
2741    fn git_lane_messages_pack_reachable_commit_graph_and_attach_checkpoint() {
2742        let dir = TempDir::new().expect("tempdir");
2743        let git = sley::Repository::init(dir.path()).expect("init git");
2744        let blob_oid = git.write_blob(b"hello\n").expect("write blob");
2745        let tree = sley::TreeObject {
2746            entries: vec![sley::plumbing::sley_object::TreeEntry {
2747                mode: 0o100644,
2748                name: sley::BString::from(b"hello.txt"),
2749                oid: blob_oid,
2750            }],
2751        };
2752        let tree_oid = git
2753            .write_raw_object(sley::GitObjectType::Tree, tree.write())
2754            .expect("write tree");
2755        let commit = sley::CommitObject {
2756            tree: tree_oid,
2757            parents: Vec::new(),
2758            author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2759            committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2760            encoding: None,
2761            message: b"checkpoint\n".to_vec(),
2762        };
2763        let commit_oid = git
2764            .write_raw_object(sley::GitObjectType::Commit, commit.write())
2765            .expect("write commit");
2766
2767        let pack = build_git_lane_pack_plan(&git, commit_oid, 64 * 1024)
2768            .expect("build git lane pack plan");
2769        assert_eq!(pack.pack_id.len(), git.object_format().raw_len());
2770        let (tx, mut rx) = mpsc::channel(8);
2771        stream_git_pack_messages_blocking(tx, pack.clone(), 64 * 1024)
2772            .expect("stream git lane pack");
2773        let mut pack_bytes = Vec::new();
2774        let mut chunks = Vec::new();
2775        while let Some(chunk) = rx.blocking_recv() {
2776            let Some(push_message::Body::GitLane(GitLaneTransfer {
2777                body: Some(git_lane_transfer::Body::Pack(pack)),
2778            })) = chunk.body
2779            else {
2780                panic!("expected git pack chunk");
2781            };
2782            pack_bytes.extend_from_slice(&pack.pack_chunk);
2783            chunks.push(pack);
2784        }
2785        assert!(!chunks.is_empty());
2786        assert!(chunks.last().expect("pack chunk").is_final_chunk);
2787        assert_eq!(pack_bytes.len() as u64, pack.pack_size);
2788        let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
2789            .expect("generated pack should index");
2790        assert_eq!(indexed.pack_id.as_bytes(), pack.pack_id.as_slice());
2791        assert_eq!(indexed.objects.len(), 3);
2792
2793        let state = ChangeId::from_bytes([9u8; 16]);
2794        let checkpoint = GitCheckpointRecord {
2795            change_id: state.to_string_full(),
2796            git_commit: commit_oid.to_hex(),
2797            summary: "checkpoint".to_string(),
2798            committed_at: "2026-06-25T00:00:00Z".to_string(),
2799        };
2800        let ref_message =
2801            git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2802        let Some(push_message::Body::GitLane(GitLaneTransfer {
2803            body: Some(git_lane_transfer::Body::RefUpdate(update)),
2804        })) = ref_message.body
2805        else {
2806            panic!("expected git ref update message");
2807        };
2808        assert_eq!(update.name, "refs/heads/main");
2809        assert_eq!(update.kind, GitRefKind::Branch as i32);
2810        assert_eq!(update.target_oid.as_ref(), commit_oid.as_bytes());
2811        let checkpoint = update.checkpoint.expect("checkpoint");
2812        assert_eq!(checkpoint.heddle_change_id.as_ref(), state.as_bytes());
2813        assert_eq!(checkpoint.git_commit_oid.as_ref(), commit_oid.as_bytes());
2814        assert_eq!(checkpoint.thread, "main");
2815    }
2816
2817    #[test]
2818    fn git_ref_expectation_marks_missing_when_remote_has_no_git_revision() {
2819        let state = ChangeId::from_bytes([9u8; 16]);
2820        let commit_oid = GitObjectId::from_hex(
2821            sley::ObjectFormat::Sha1,
2822            "0123456789abcdef0123456789abcdef01234567",
2823        )
2824        .expect("oid");
2825        let checkpoint = GitCheckpointRecord {
2826            change_id: state.to_string_full(),
2827            git_commit: commit_oid.to_hex(),
2828            summary: "checkpoint".to_string(),
2829            committed_at: "2026-06-25T00:00:00Z".to_string(),
2830        };
2831        let mut message =
2832            git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2833
2834        apply_git_ref_expectation(&mut message, "").expect("missing expectation");
2835        let update = git_ref_update_from_message(&message);
2836        assert!(update.expected_missing);
2837        assert!(update.expected_target_oid.is_empty());
2838    }
2839
2840    #[test]
2841    fn git_ref_expectation_uses_remote_git_revision_oid() {
2842        let state = ChangeId::from_bytes([9u8; 16]);
2843        let commit_oid = GitObjectId::from_hex(
2844            sley::ObjectFormat::Sha1,
2845            "0123456789abcdef0123456789abcdef01234567",
2846        )
2847        .expect("oid");
2848        let remote_oid = "89abcdef012345670123456789abcdef01234567";
2849        let checkpoint = GitCheckpointRecord {
2850            change_id: state.to_string_full(),
2851            git_commit: commit_oid.to_hex(),
2852            summary: "checkpoint".to_string(),
2853            committed_at: "2026-06-25T00:00:00Z".to_string(),
2854        };
2855        let mut message =
2856            git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2857
2858        apply_git_ref_expectation(&mut message, &format!("git:{remote_oid}"))
2859            .expect("git expectation");
2860        let update = git_ref_update_from_message(&message);
2861        assert!(!update.expected_missing);
2862        assert_eq!(hex::encode(update.expected_target_oid.as_ref()), remote_oid);
2863    }
2864
2865    #[test]
2866    fn git_pack_stream_writer_emits_ordered_chunks() {
2867        let (tx, mut rx) = mpsc::channel(4);
2868        let mut writer =
2869            GitPackPushMessageWriter::new(tx, "git-pack:test".to_string(), vec![0x42; 20], 10, 4);
2870        writer.write_all(b"abcdefghij").expect("write pack bytes");
2871        writer.finish().expect("finish pack stream");
2872
2873        let mut chunks = Vec::new();
2874        while let Some(message) = rx.blocking_recv() {
2875            let Some(push_message::Body::GitLane(GitLaneTransfer {
2876                body: Some(git_lane_transfer::Body::Pack(pack)),
2877            })) = message.body
2878            else {
2879                panic!("expected git pack chunk");
2880            };
2881            chunks.push(pack);
2882        }
2883
2884        assert_eq!(chunks.len(), 3);
2885        assert_eq!(chunks[0].offset, 0);
2886        assert_eq!(chunks[0].chunk_index, 0);
2887        assert!(!chunks[0].is_final_chunk);
2888        assert_eq!(chunks[0].pack_chunk.as_ref(), b"abcd");
2889        assert_eq!(chunks[1].offset, 4);
2890        assert_eq!(chunks[1].chunk_index, 1);
2891        assert!(!chunks[1].is_final_chunk);
2892        assert_eq!(chunks[1].pack_chunk.as_ref(), b"efgh");
2893        assert_eq!(chunks[2].offset, 8);
2894        assert_eq!(chunks[2].chunk_index, 2);
2895        assert!(chunks[2].is_final_chunk);
2896        assert_eq!(chunks[2].pack_chunk.as_ref(), b"ij");
2897        assert!(
2898            chunks
2899                .iter()
2900                .all(|chunk| chunk.pack_id.as_ref() == &[0x42; 20][..])
2901        );
2902    }
2903
2904    #[test]
2905    fn git_pack_pull_install_state_streams_pack_into_sley_store() {
2906        let source_dir = TempDir::new().expect("source tempdir");
2907        let source = sley::Repository::init(source_dir.path()).expect("init source git");
2908        let blob_oid = source.write_blob(b"streamed pull pack\n").expect("blob");
2909        let pack = sley::plumbing::sley_odb::build_reachable_pack(
2910            source.objects().as_ref(),
2911            source.object_format(),
2912            [blob_oid],
2913            &HashSet::new(),
2914        )
2915        .expect("build pack")
2916        .expect("reachable pack");
2917
2918        let dest_dir = TempDir::new().expect("dest tempdir");
2919        let dest = sley::Repository::init(dest_dir.path()).expect("init dest git");
2920        let mut state = GitPackPullInstallState::default();
2921        let chunk_size = 7usize;
2922        let mut offset = 0u64;
2923        for (chunk_index, chunk) in pack.pack.chunks(chunk_size).enumerate() {
2924            let next_offset = offset + chunk.len() as u64;
2925            state
2926                .receive_chunk(
2927                    &dest,
2928                    GitPackTransfer {
2929                        transfer_id: "git-pack:test".to_string(),
2930                        offset,
2931                        chunk_index: chunk_index as u32,
2932                        is_final_chunk: next_offset == pack.pack.len() as u64,
2933                        pack_size: pack.pack.len() as u64,
2934                        pack_chunk: chunk.to_vec().into(),
2935                        pack_id: pack.checksum.as_bytes().to_vec().into(),
2936                    },
2937                )
2938                .expect("receive chunk");
2939            offset = next_offset;
2940        }
2941
2942        state.ensure_idle().expect("stream should finish");
2943        let object = dest.read_object(&blob_oid).expect("read installed object");
2944        assert_eq!(object.body.as_slice(), b"streamed pull pack\n");
2945    }
2946
2947    #[test]
2948    fn accept_git_lane_pull_transfer_errors_for_non_overlay_repo() {
2949        let (_dir, repo) = temp_repo();
2950        assert_ne!(
2951            repo.capability(),
2952            RepositoryCapability::GitOverlay,
2953            "init_default repo must be non-GitOverlay for this guard test",
2954        );
2955        let mut git_repo = None;
2956        let mut state = GitPackPullInstallState::default();
2957        let transfer = GitLaneTransfer {
2958            body: Some(git_lane_transfer::Body::Pack(GitPackTransfer {
2959                transfer_id: "git-pack:test".to_string(),
2960                offset: 0,
2961                chunk_index: 0,
2962                is_final_chunk: true,
2963                pack_size: 0,
2964                pack_chunk: Vec::new().into(),
2965                pack_id: Vec::new().into(),
2966            })),
2967        };
2968        let err = accept_git_lane_pull_transfer(&repo, &mut git_repo, &mut state, transfer)
2969            .expect_err("git-lane pull to a non-GitOverlay repo must fail loud, not silently drop");
2970        assert!(
2971            matches!(err, ProtocolError::InvalidState(_)),
2972            "expected InvalidState protocol error, got {err:?}",
2973        );
2974    }
2975
2976    fn git_ref_update_from_message(message: &PushMessage) -> &GitRefUpdateTransfer {
2977        let Some(push_message::Body::GitLane(GitLaneTransfer {
2978            body: Some(git_lane_transfer::Body::RefUpdate(update)),
2979        })) = message.body.as_ref()
2980        else {
2981            panic!("expected git ref update message");
2982        };
2983        update
2984    }
2985
2986    fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
2987        let hex = hash.to_hex();
2988        let (prefix, rest) = hex.split_at(2);
2989        repo.heddle_dir()
2990            .join("objects")
2991            .join("trees")
2992            .join(prefix)
2993            .join(rest)
2994    }
2995
2996    fn sample_redaction(blob: ContentHash) -> Redaction {
2997        Redaction {
2998            redacted_blob: blob,
2999            state: ChangeId::from_bytes([1u8; 16]),
3000            path: "config/secrets.toml".into(),
3001            reason: "leaked credential".into(),
3002            redactor: Principal {
3003                name: "Grace Hopper".into(),
3004                email: "grace@example.com".into(),
3005            },
3006            redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
3007            signature: None,
3008            purged_at: None,
3009            supersedes: None,
3010        }
3011    }
3012
3013    fn sample_state_visibility(state: ChangeId) -> StateVisibility {
3014        StateVisibility {
3015            state,
3016            tier: VisibilityTier::Restricted {
3017                scope_label: "security-embargo".into(),
3018            },
3019            embargo_until: None,
3020            declarer: Principal {
3021                name: "Grace Hopper".into(),
3022                email: "grace@example.com".into(),
3023            },
3024            declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
3025            signature: None,
3026            supersedes: None,
3027        }
3028    }
3029
3030    #[test]
3031    fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
3032        for obj_type in wire::native_pack_excluded_object_types() {
3033            assert!(
3034                is_out_of_pack_transfer_object_type(*obj_type),
3035                "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
3036            );
3037        }
3038    }
3039
3040    #[test]
3041    fn native_pack_required_tracks_packable_pull_wants() {
3042        let blob = sample_blob();
3043        let state = ChangeId::from_bytes([9u8; 16]);
3044
3045        let sidecar_only = HashMap::from([(
3046            PackObjectId::ChangeId(state),
3047            vec![ObjectType::StateVisibility],
3048        )]);
3049        assert!(!native_pack_required_for_pull(false, &sidecar_only));
3050
3051        let redaction_only =
3052            HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Redaction])]);
3053        assert!(!native_pack_required_for_pull(false, &redaction_only));
3054
3055        let packable = HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Blob])]);
3056        assert!(native_pack_required_for_pull(false, &packable));
3057
3058        let state_with_sidecar = HashMap::from([(
3059            PackObjectId::ChangeId(state),
3060            vec![ObjectType::State, ObjectType::StateVisibility],
3061        )]);
3062        assert!(native_pack_required_for_pull(false, &state_with_sidecar));
3063        assert!(native_pack_required_for_pull(true, &HashMap::new()));
3064    }
3065
3066    #[test]
3067    fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
3068        let (_dir, repo) = temp_repo();
3069        let state = ChangeId::from_bytes([9u8; 16]);
3070        let plan = plan_pull_wants(
3071            &repo,
3072            &state,
3073            false,
3074            vec![
3075                object_descriptor_with_status(
3076                    &state_info(state),
3077                    ObjectAvailabilityStatus::Missing,
3078                    "missing state",
3079                ),
3080                object_descriptor_with_status(
3081                    &state_visibility_info(state),
3082                    ObjectAvailabilityStatus::Missing,
3083                    "missing state visibility",
3084                ),
3085            ],
3086            false,
3087        )
3088        .expect("plan pull wants");
3089
3090        let wanted = plan
3091            .wanted_types
3092            .get(&PackObjectId::ChangeId(state))
3093            .expect("same ChangeId want entry");
3094        assert_eq!(
3095            wanted.as_slice(),
3096            &[ObjectType::State, ObjectType::StateVisibility]
3097        );
3098        assert!(native_pack_required_for_pull(
3099            plan.want_full_closure,
3100            &plan.wanted_types
3101        ));
3102    }
3103
3104    #[derive(Clone)]
3105    struct SidecarOnlyPullService {
3106        state: ChangeId,
3107        state_visibility_blob: Vec<u8>,
3108    }
3109
3110    #[tonic::async_trait]
3111    impl RepoSyncService for SidecarOnlyPullService {
3112        async fn list_refs(
3113            &self,
3114            _request: tonic::Request<ListRefsRequest>,
3115        ) -> Result<Response<ListRefsResponse>, Status> {
3116            Ok(Response::new(ListRefsResponse::default()))
3117        }
3118
3119        async fn update_ref(
3120            &self,
3121            _request: tonic::Request<UpdateRefRequest>,
3122        ) -> Result<Response<UpdateRefResponse>, Status> {
3123            Ok(Response::new(UpdateRefResponse::default()))
3124        }
3125
3126        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3127
3128        async fn push(
3129            &self,
3130            _request: tonic::Request<tonic::Streaming<PushMessage>>,
3131        ) -> Result<Response<Self::PushStream>, Status> {
3132            let (_tx, rx) = mpsc::channel(1);
3133            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3134                rx,
3135            )))
3136        }
3137
3138        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3139
3140        async fn pull(
3141            &self,
3142            request: tonic::Request<tonic::Streaming<PullMessage>>,
3143        ) -> Result<Response<Self::PullStream>, Status> {
3144            let state = self.state;
3145            let state_visibility_blob = self.state_visibility_blob.clone();
3146            let (tx, rx) = mpsc::channel(4);
3147
3148            tokio::spawn(async move {
3149                let mut inbound = request.into_inner();
3150                match inbound.message().await {
3151                    Ok(Some(PullMessage {
3152                        body: Some(pull_message::Body::Request(_)),
3153                    })) => {}
3154                    other => {
3155                        let _ = tx
3156                            .send(Err(Status::invalid_argument(format!(
3157                                "expected pull request, got {other:?}"
3158                            ))))
3159                            .await;
3160                        return;
3161                    }
3162                }
3163
3164                let descriptor = object_descriptor_with_status(
3165                    &state_visibility_info(state),
3166                    ObjectAvailabilityStatus::Missing,
3167                    "missing state visibility",
3168                );
3169                let ready = PullMessage {
3170                    body: Some(pull_message::Body::Ready(PullReady {
3171                        remote_state: state.to_string_full(),
3172                        objects_to_fetch: vec![descriptor],
3173                        transfer: None,
3174                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
3175                        missing_objects: Vec::new(),
3176                        full_closure_available: false,
3177                        object_count: 1,
3178                        remote_revision_address: native_revision_address(state),
3179                    })),
3180                };
3181                if tx.send(Ok(ready)).await.is_err() {
3182                    return;
3183                }
3184
3185                match inbound.message().await {
3186                    Ok(Some(PullMessage {
3187                        body: Some(pull_message::Body::Want(want)),
3188                    })) if !want.want_full_closure
3189                        && want.objects.len() == 1
3190                        && want.objects[0].object_type == "state_visibility" => {}
3191                    other => {
3192                        let _ = tx
3193                            .send(Err(Status::invalid_argument(format!(
3194                                "expected sidecar-only want, got {other:?}"
3195                            ))))
3196                            .await;
3197                        return;
3198                    }
3199                }
3200
3201                let transfer = PullMessage {
3202                    body: Some(pull_message::Body::StateVisibility(
3203                        StateVisibilityTransfer {
3204                            state_id: state.to_string_full(),
3205                            state_visibility_blob: state_visibility_blob.into(),
3206                        },
3207                    )),
3208                };
3209                if tx.send(Ok(transfer)).await.is_err() {
3210                    return;
3211                }
3212
3213                let complete = PullMessage {
3214                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
3215                        success: true,
3216                        new_state: state.to_string_full(),
3217                        error: String::new(),
3218                        transfer: Some(TransferCheckpoint {
3219                            transfer_id: "sidecar-only-test".to_string(),
3220                            transport_mode: TransportMode::NativePack as i32,
3221                            resume_offset: 0,
3222                            chunk_index: 0,
3223                            checkpoint: b"heddle-markers-v1\n".to_vec(),
3224                            is_complete: true,
3225                        }),
3226                        new_revision_address: native_revision_address(state),
3227                    })),
3228                };
3229                let _ = tx.send(Ok(complete)).await;
3230            });
3231
3232            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3233                rx,
3234            )))
3235        }
3236    }
3237
3238    async fn connect_sidecar_only_service(
3239        service: SidecarOnlyPullService,
3240    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3241        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3242            Ok(listener) => listener,
3243            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3244                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3245                return None;
3246            }
3247            Err(err) => panic!("bind test server: {err}"),
3248        };
3249        let addr = listener.local_addr().expect("local addr");
3250        let incoming = futures::stream::unfold(listener, |listener| async {
3251            match listener.accept().await {
3252                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3253                Err(err) => Some((Err(err), listener)),
3254            }
3255        });
3256
3257        let handle = tokio::spawn(async move {
3258            Server::builder()
3259                .add_service(RepoSyncServiceServer::new(service))
3260                .serve_with_incoming(incoming)
3261                .await
3262                .expect("serve sidecar-only test service");
3263        });
3264
3265        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3266            .await
3267            .expect("connect client");
3268        Some((client, handle))
3269    }
3270
3271    #[tokio::test]
3272    async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
3273        let (_dir, repo) = temp_repo();
3274        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3275        let state = State::new_snapshot(
3276            tree_hash,
3277            vec![],
3278            Attribution::human(Principal {
3279                name: "Grace Hopper".into(),
3280                email: "grace@example.com".into(),
3281            }),
3282        );
3283        let state_id = state.change_id;
3284        repo.store().put_state(&state).expect("put state");
3285        assert!(
3286            repo.get_state_visibility_bytes_for_state(&state_id)
3287                .expect("load local sidecar")
3288                .is_none(),
3289            "test starts with state present and StateVisibility sidecar absent"
3290        );
3291
3292        let state_visibility_blob =
3293            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3294                .encode()
3295                .expect("encode state visibility blob");
3296        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3297            state: state_id,
3298            state_visibility_blob,
3299        })
3300        .await
3301        else {
3302            return;
3303        };
3304
3305        let exchange = tokio::time::timeout(
3306            Duration::from_secs(5),
3307            client.pull_exchange(
3308                &repo,
3309                "owner/repo",
3310                "main",
3311                PullOptions {
3312                    local_thread: None,
3313                    depth: None,
3314                    target_state: Some(state_id),
3315                    materialization: PullMaterialization::Full,
3316                },
3317            ),
3318        )
3319        .await
3320        .expect("sidecar-only pull must not hang waiting for native pack")
3321        .expect("sidecar-only pull succeeds");
3322        server.abort();
3323
3324        assert!(exchange.result.success);
3325        assert_eq!(exchange.object_count, 0);
3326        assert_eq!(exchange.profile.pack_bytes_received, 0);
3327        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3328        assert!(
3329            repo.get_state_visibility_for_state(&state_id)
3330                .expect("load accepted sidecar")
3331                .has_record(),
3332            "pull must accept the out-of-pack StateVisibility sidecar"
3333        );
3334    }
3335
3336    // A sidecar blob larger than tonic's 4 MiB default decode limit but well
3337    // under the 64 MiB receive cap must still decode + install: the raised
3338    // `max_decoding_message_size` is the bound, and it isn't set too tight.
3339    #[tokio::test]
3340    async fn legitimate_large_sidecar_blob_decodes_and_installs() {
3341        let (_dir, repo) = temp_repo();
3342        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3343        let state = State::new_snapshot(
3344            tree_hash,
3345            vec![],
3346            Attribution::human(Principal {
3347                name: "Grace Hopper".into(),
3348                email: "grace@example.com".into(),
3349            }),
3350        );
3351        let state_id = state.change_id;
3352        repo.store().put_state(&state).expect("put state");
3353
3354        // ~8 MiB blob: above tonic's 4 MiB default (which would reject at
3355        // decode without the raised limit), below the 64 MiB sidecar cap.
3356        let mut record = sample_state_visibility(state_id);
3357        record.tier = VisibilityTier::Restricted {
3358            scope_label: "x".repeat(8 * 1024 * 1024),
3359        };
3360        let state_visibility_blob = StateVisibilityBlob::new(vec![record])
3361            .encode()
3362            .expect("encode large state visibility blob");
3363        assert!(
3364            state_visibility_blob.len() > 4 * 1024 * 1024,
3365            "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
3366        );
3367        assert!(
3368            (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
3369            "blob must stay within the legitimate sidecar receive cap"
3370        );
3371
3372        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3373            state: state_id,
3374            state_visibility_blob,
3375        })
3376        .await
3377        else {
3378            return;
3379        };
3380
3381        let exchange = tokio::time::timeout(
3382            Duration::from_secs(30),
3383            client.pull_exchange(
3384                &repo,
3385                "owner/repo",
3386                "main",
3387                PullOptions {
3388                    local_thread: None,
3389                    depth: None,
3390                    target_state: Some(state_id),
3391                    materialization: PullMaterialization::Full,
3392                },
3393            ),
3394        )
3395        .await
3396        .expect("large-sidecar pull must not hang")
3397        .expect("large but legitimate sidecar pull succeeds");
3398        server.abort();
3399
3400        assert!(exchange.result.success);
3401        assert!(
3402            repo.get_state_visibility_for_state(&state_id)
3403                .expect("load accepted sidecar")
3404                .has_record(),
3405            "pull must accept a legitimately-large StateVisibility sidecar"
3406        );
3407    }
3408
3409    // A sidecar blob beyond the pull-stream decode limit must be rejected at
3410    // the gRPC decode boundary — before its `Vec<u8>` is materialized — not by
3411    // the cheaper post-decode `check_received_transfer_blob_size` guard.
3412    #[tokio::test]
3413    async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
3414        let (_dir, repo) = temp_repo();
3415        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3416        let state = State::new_snapshot(
3417            tree_hash,
3418            vec![],
3419            Attribution::human(Principal {
3420                name: "Grace Hopper".into(),
3421                email: "grace@example.com".into(),
3422            }),
3423        );
3424        let state_id = state.change_id;
3425        repo.store().put_state(&state).expect("put state");
3426
3427        // One byte past the decode limit. Content is irrelevant: decode is
3428        // refused before the blob is ever handed to the accept path.
3429        let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
3430        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3431            state: state_id,
3432            state_visibility_blob: oversized,
3433        })
3434        .await
3435        else {
3436            return;
3437        };
3438
3439        let result = tokio::time::timeout(
3440            Duration::from_secs(30),
3441            client.pull_exchange(
3442                &repo,
3443                "owner/repo",
3444                "main",
3445                PullOptions {
3446                    local_thread: None,
3447                    depth: None,
3448                    target_state: Some(state_id),
3449                    materialization: PullMaterialization::Full,
3450                },
3451            ),
3452        )
3453        .await
3454        .expect("oversized-sidecar pull must not hang");
3455        server.abort();
3456
3457        let err = match result {
3458            Err(err) => err,
3459            Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
3460        };
3461        let message = err.to_string();
3462        assert!(
3463            !message.contains("exceeds receive size limit"),
3464            "rejection must come from the decode-size limit, before the post-decode check: {message}"
3465        );
3466        assert!(
3467            repo.get_state_visibility_for_state(&state_id)
3468                .expect("load sidecar")
3469                .latest()
3470                .expect("resolve visibility")
3471                .is_none(),
3472            "an oversized sidecar must never be installed"
3473        );
3474    }
3475
3476    #[derive(Clone)]
3477    struct StateAndVisibilityPullService {
3478        state: ChangeId,
3479        pack_bundle: wire::NativePackBundle,
3480        state_visibility_blob: Vec<u8>,
3481    }
3482
3483    #[tonic::async_trait]
3484    impl RepoSyncService for StateAndVisibilityPullService {
3485        async fn list_refs(
3486            &self,
3487            _request: tonic::Request<ListRefsRequest>,
3488        ) -> Result<Response<ListRefsResponse>, Status> {
3489            Ok(Response::new(ListRefsResponse::default()))
3490        }
3491
3492        async fn update_ref(
3493            &self,
3494            _request: tonic::Request<UpdateRefRequest>,
3495        ) -> Result<Response<UpdateRefResponse>, Status> {
3496            Ok(Response::new(UpdateRefResponse::default()))
3497        }
3498
3499        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3500
3501        async fn push(
3502            &self,
3503            _request: tonic::Request<tonic::Streaming<PushMessage>>,
3504        ) -> Result<Response<Self::PushStream>, Status> {
3505            let (_tx, rx) = mpsc::channel(1);
3506            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3507                rx,
3508            )))
3509        }
3510
3511        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3512
3513        async fn pull(
3514            &self,
3515            request: tonic::Request<tonic::Streaming<PullMessage>>,
3516        ) -> Result<Response<Self::PullStream>, Status> {
3517            let state = self.state;
3518            let pack_bundle = self.pack_bundle.clone();
3519            let state_visibility_blob = self.state_visibility_blob.clone();
3520            let (tx, rx) = mpsc::channel(8);
3521
3522            tokio::spawn(async move {
3523                let mut inbound = request.into_inner();
3524                match inbound.message().await {
3525                    Ok(Some(PullMessage {
3526                        body: Some(pull_message::Body::Request(_)),
3527                    })) => {}
3528                    other => {
3529                        let _ = tx
3530                            .send(Err(Status::invalid_argument(format!(
3531                                "expected pull request, got {other:?}"
3532                            ))))
3533                            .await;
3534                        return;
3535                    }
3536                }
3537
3538                let ready = PullMessage {
3539                    body: Some(pull_message::Body::Ready(PullReady {
3540                        remote_state: state.to_string_full(),
3541                        objects_to_fetch: vec![
3542                            object_descriptor_with_status(
3543                                &state_info(state),
3544                                ObjectAvailabilityStatus::Missing,
3545                                "missing state",
3546                            ),
3547                            object_descriptor_with_status(
3548                                &state_visibility_info(state),
3549                                ObjectAvailabilityStatus::Missing,
3550                                "missing state visibility",
3551                            ),
3552                        ],
3553                        transfer: None,
3554                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
3555                        missing_objects: Vec::new(),
3556                        full_closure_available: false,
3557                        object_count: 2,
3558                        remote_revision_address: native_revision_address(state),
3559                    })),
3560                };
3561                if tx.send(Ok(ready)).await.is_err() {
3562                    return;
3563                }
3564
3565                match inbound.message().await {
3566                    Ok(Some(PullMessage {
3567                        body: Some(pull_message::Body::Want(want)),
3568                    })) if !want.want_full_closure
3569                        && want.objects.len() == 2
3570                        && want
3571                            .objects
3572                            .iter()
3573                            .any(|object| object.object_type == "state")
3574                        && want
3575                            .objects
3576                            .iter()
3577                            .any(|object| object.object_type == "state_visibility") => {}
3578                    other => {
3579                        let _ = tx
3580                            .send(Err(Status::invalid_argument(format!(
3581                                "expected state + sidecar wants, got {other:?}"
3582                            ))))
3583                            .await;
3584                        return;
3585                    }
3586                }
3587
3588                for message in
3589                    encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
3590                {
3591                    if tx.send(Ok(message)).await.is_err() {
3592                        return;
3593                    }
3594                }
3595
3596                let transfer = PullMessage {
3597                    body: Some(pull_message::Body::StateVisibility(
3598                        StateVisibilityTransfer {
3599                            state_id: state.to_string_full(),
3600                            state_visibility_blob: state_visibility_blob.into(),
3601                        },
3602                    )),
3603                };
3604                if tx.send(Ok(transfer)).await.is_err() {
3605                    return;
3606                }
3607
3608                let complete = PullMessage {
3609                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
3610                        success: true,
3611                        new_state: state.to_string_full(),
3612                        error: String::new(),
3613                        transfer: Some(TransferCheckpoint {
3614                            transfer_id: "state-and-visibility-test".to_string(),
3615                            transport_mode: TransportMode::NativePack as i32,
3616                            resume_offset: 0,
3617                            chunk_index: 0,
3618                            checkpoint: b"heddle-markers-v1\n".to_vec(),
3619                            is_complete: true,
3620                        }),
3621                        new_revision_address: native_revision_address(state),
3622                    })),
3623                };
3624                let _ = tx.send(Ok(complete)).await;
3625            });
3626
3627            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3628                rx,
3629            )))
3630        }
3631    }
3632
3633    fn encode_pull_native_pack_messages(
3634        bundle: &wire::NativePackBundle,
3635        transfer_id: &str,
3636        chunk_size: usize,
3637    ) -> Vec<PullMessage> {
3638        let mut messages = Vec::new();
3639        let chunk_size = chunk_size.max(1);
3640
3641        let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
3642        for chunk_index in 0..pack_total_chunks.max(1) {
3643            let Some((start, len)) =
3644                wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
3645            else {
3646                break;
3647            };
3648            messages.push(PullMessage {
3649                body: Some(pull_message::Body::Pack(PackChunk {
3650                    stream_kind: PackStreamKind::Pack as i32,
3651                    data: bundle.pack_data[start..start + len].to_vec().into(),
3652                    transfer: Some(TransferCheckpoint {
3653                        transfer_id: transfer_id.to_string(),
3654                        transport_mode: TransportMode::NativePack as i32,
3655                        resume_offset: start as u64,
3656                        chunk_index: chunk_index as u32,
3657                        checkpoint: Vec::new(),
3658                        is_complete: chunk_index + 1 == pack_total_chunks,
3659                    }),
3660                    chunk_length: len as u32,
3661                    is_final_chunk: chunk_index + 1 == pack_total_chunks,
3662                })),
3663            });
3664        }
3665
3666        let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
3667        for chunk_index in 0..index_total_chunks.max(1) {
3668            let Some((start, len)) =
3669                wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
3670            else {
3671                break;
3672            };
3673            messages.push(PullMessage {
3674                body: Some(pull_message::Body::Pack(PackChunk {
3675                    stream_kind: PackStreamKind::Index as i32,
3676                    data: bundle.index_data[start..start + len].to_vec().into(),
3677                    transfer: Some(TransferCheckpoint {
3678                        transfer_id: transfer_id.to_string(),
3679                        transport_mode: TransportMode::NativePack as i32,
3680                        resume_offset: start as u64,
3681                        chunk_index: chunk_index as u32,
3682                        checkpoint: Vec::new(),
3683                        is_complete: chunk_index + 1 == index_total_chunks,
3684                    }),
3685                    chunk_length: len as u32,
3686                    is_final_chunk: chunk_index + 1 == index_total_chunks,
3687                })),
3688            });
3689        }
3690
3691        messages
3692    }
3693
3694    async fn connect_state_and_visibility_service(
3695        service: StateAndVisibilityPullService,
3696    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3697        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3698            Ok(listener) => listener,
3699            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3700                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3701                return None;
3702            }
3703            Err(err) => panic!("bind test server: {err}"),
3704        };
3705        let addr = listener.local_addr().expect("local addr");
3706        let incoming = futures::stream::unfold(listener, |listener| async {
3707            match listener.accept().await {
3708                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3709                Err(err) => Some((Err(err), listener)),
3710            }
3711        });
3712
3713        let handle = tokio::spawn(async move {
3714            Server::builder()
3715                .add_service(RepoSyncServiceServer::new(service))
3716                .serve_with_incoming(incoming)
3717                .await
3718                .expect("serve state-and-visibility test service");
3719        });
3720
3721        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3722            .await
3723            .expect("connect client");
3724        Some((client, handle))
3725    }
3726
3727    #[tokio::test]
3728    async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
3729        let (_source_dir, source_repo) = temp_repo();
3730        let (_target_dir, target_repo) = temp_repo();
3731        let tree_hash = source_repo
3732            .store()
3733            .put_tree(&Tree::new())
3734            .expect("put tree");
3735        let state = State::new_snapshot(
3736            tree_hash,
3737            vec![],
3738            Attribution::human(Principal {
3739                name: "Grace Hopper".into(),
3740                email: "grace@example.com".into(),
3741            }),
3742        );
3743        let state_id = state.change_id;
3744        source_repo
3745            .store()
3746            .put_state(&state)
3747            .expect("put source state");
3748        let state_visibility_blob =
3749            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3750                .encode()
3751                .expect("encode state visibility blob");
3752        source_repo
3753            .accept_wire_state_visibility(state_id, &state_visibility_blob)
3754            .expect("put source state visibility");
3755        let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
3756            .expect("build state pack");
3757
3758        assert!(
3759            target_repo
3760                .store()
3761                .get_state(&state_id)
3762                .expect("load target state")
3763                .is_none(),
3764            "test starts with state absent"
3765        );
3766        assert!(
3767            target_repo
3768                .get_state_visibility_bytes_for_state(&state_id)
3769                .expect("load target sidecar")
3770                .is_none(),
3771            "test starts with StateVisibility sidecar absent"
3772        );
3773
3774        let Some((mut client, server)) =
3775            connect_state_and_visibility_service(StateAndVisibilityPullService {
3776                state: state_id,
3777                pack_bundle,
3778                state_visibility_blob,
3779            })
3780            .await
3781        else {
3782            return;
3783        };
3784
3785        let exchange = tokio::time::timeout(
3786            Duration::from_secs(5),
3787            client.pull_exchange(
3788                &target_repo,
3789                "owner/repo",
3790                "main",
3791                PullOptions {
3792                    local_thread: None,
3793                    depth: None,
3794                    target_state: Some(state_id),
3795                    materialization: PullMaterialization::Full,
3796                },
3797            ),
3798        )
3799        .await
3800        .expect("state + sidecar pull must not hang waiting for native pack")
3801        .expect("state + sidecar pull succeeds");
3802        server.abort();
3803
3804        assert!(exchange.result.success);
3805        assert_eq!(exchange.object_count, 1);
3806        assert!(exchange.profile.pack_bytes_received > 0);
3807        assert_eq!(exchange.profile.object_mix.states, 1);
3808        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3809        assert!(
3810            target_repo
3811                .store()
3812                .get_state(&state_id)
3813                .expect("load installed state")
3814                .is_some(),
3815            "native pack must install the State"
3816        );
3817        assert!(
3818            target_repo
3819                .get_state_visibility_for_state(&state_id)
3820                .expect("load accepted sidecar")
3821                .has_record(),
3822            "pull must accept the out-of-pack StateVisibility sidecar"
3823        );
3824    }
3825
3826    #[test]
3827    fn collect_missing_blobs_treats_absent_tree_as_empty() {
3828        let (_dir, repo) = temp_repo();
3829        let absent_tree = ContentHash::from_bytes([99u8; 32]);
3830
3831        let missing =
3832            collect_missing_blobs(&repo, &absent_tree).expect("absent tree is not an error");
3833
3834        assert!(missing.is_empty());
3835    }
3836
3837    #[test]
3838    fn collect_missing_blobs_reports_only_genuinely_missing_blobs() {
3839        let (_dir, repo) = temp_repo();
3840        let present_blob = Blob::from("already local");
3841        let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
3842        let missing_hash = ContentHash::from_bytes([42u8; 32]);
3843        let tree = Tree::from_entries(vec![
3844            TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
3845            TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
3846        ]);
3847        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
3848
3849        let missing = collect_missing_blobs(&repo, &tree_hash).expect("collect missing blobs");
3850
3851        assert_eq!(missing, vec![missing_hash]);
3852    }
3853
3854    #[test]
3855    fn collect_missing_blobs_reports_corrupt_tree_read() {
3856        let (_dir, repo) = temp_repo();
3857        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3858        std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
3859        repo.store().clear_recent_caches();
3860
3861        let err = collect_missing_blobs(&repo, &tree_hash).expect_err("corrupt tree must fail");
3862
3863        assert!(matches!(err, ProtocolError::InvalidState(_)));
3864        assert!(
3865            err.to_string().contains(&format!(
3866                "load tree {} while collecting lazy hydration missing blobs",
3867                tree_hash.to_hex()
3868            )),
3869            "unexpected error: {err}"
3870        );
3871    }
3872
3873    #[test]
3874    fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
3875        let (_dir, repo) = temp_repo();
3876        let blob = sample_blob();
3877        repo.put_redaction(sample_redaction(blob))
3878            .expect("put redaction");
3879        let expected_bytes = repo
3880            .store()
3881            .get_redactions_bytes_for_blob(&blob)
3882            .expect("load sidecar")
3883            .expect("sidecar exists");
3884
3885        let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
3886
3887        let Some(push_message::Body::Redaction(transfer)) = message.body else {
3888            panic!("expected redaction transfer");
3889        };
3890        assert_eq!(transfer.blob_hash, blob.to_hex());
3891        assert_eq!(transfer.redactions_blob, expected_bytes);
3892    }
3893
3894    #[test]
3895    fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
3896        let (_dir, repo) = temp_repo();
3897        let blob = sample_blob();
3898
3899        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
3900
3901        assert!(matches!(err, ProtocolError::InvalidState(_)));
3902        assert!(
3903            err.to_string().contains(&format!(
3904                "server wants redaction for blob {} but sender has no sidecar",
3905                blob.to_hex()
3906            )),
3907            "unexpected error: {err}"
3908        );
3909    }
3910
3911    #[test]
3912    fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
3913        let (_dir, repo) = temp_repo();
3914        let blob = sample_blob();
3915        let redaction_path = repo
3916            .heddle_dir()
3917            .join("redactions")
3918            .join(format!("{}.bin", blob.to_hex()));
3919        std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
3920
3921        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
3922
3923        assert!(matches!(err, ProtocolError::InvalidState(_)));
3924        assert!(
3925            err.to_string()
3926                .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
3927            "unexpected error: {err}"
3928        );
3929    }
3930
3931    #[test]
3932    fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
3933        let (_dir, repo) = temp_repo();
3934        let state = ChangeId::from_bytes([17u8; 16]);
3935        repo.put_state_visibility(sample_state_visibility(state))
3936            .expect("put state visibility");
3937        let expected_bytes = repo
3938            .get_state_visibility_bytes_for_state(&state)
3939            .expect("load sidecar")
3940            .expect("sidecar exists");
3941
3942        let message =
3943            state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
3944
3945        let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
3946            panic!("expected state visibility transfer");
3947        };
3948        assert_eq!(transfer.state_id, state.to_string_full());
3949        assert_eq!(transfer.state_visibility_blob, expected_bytes);
3950    }
3951
3952    // ---- bare-pull head advertisement (delta sync) ----
3953
3954    fn sample_attribution() -> Attribution {
3955        Attribution::human(Principal {
3956            name: "Grace Hopper".into(),
3957            email: "grace@example.com".into(),
3958        })
3959    }
3960
3961    /// Put a state whose tree holds one blob into `repo`'s store. Returns the
3962    /// state id. With `parents`, builds a child on top of an existing state.
3963    fn put_state_with_blob(
3964        repo: &Repository,
3965        contents: &str,
3966        parents: Vec<ChangeId>,
3967    ) -> ChangeId {
3968        let blob = Blob::from(contents);
3969        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
3970        let tree = Tree::from_entries(vec![
3971            TreeEntry::file(format!("{contents}.txt"), blob_hash, false).expect("tree entry"),
3972        ]);
3973        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
3974        let state = State::new_snapshot(tree_hash, parents, sample_attribution());
3975        let state_id = state.change_id;
3976        repo.store().put_state(&state).expect("put state");
3977        state_id
3978    }
3979
3980    #[test]
3981    fn locally_complete_pull_head_advertises_fully_present_thread_head() {
3982        let (_dir, repo) = temp_repo();
3983        let head = put_state_with_blob(&repo, "alpha", vec![]);
3984        repo.refs()
3985            .set_thread(&ThreadName::from("main"), &head)
3986            .expect("set thread");
3987
3988        let advertised =
3989            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
3990        assert_eq!(
3991            advertised,
3992            Some(head),
3993            "a thread head whose full closure is present locally must be advertised"
3994        );
3995    }
3996
3997    #[test]
3998    fn locally_complete_pull_head_skips_unknown_thread() {
3999        // Unseeded local repo: the bare-pull thread has no local head, so
4000        // there is nothing to advertise and the pull falls back to a full
4001        // closure.
4002        let (_dir, repo) = temp_repo_unseeded();
4003        let advertised =
4004            locally_complete_pull_head(&repo, "nonexistent", None).expect("completeness check");
4005        assert_eq!(advertised, None);
4006    }
4007
4008    #[test]
4009    fn locally_complete_pull_head_skips_when_target_state_override_in_play() {
4010        let (_dir, repo) = temp_repo();
4011        let head = put_state_with_blob(&repo, "alpha", vec![]);
4012        repo.refs()
4013            .set_thread(&ThreadName::from("main"), &head)
4014            .expect("set thread");
4015
4016        // A fetch_state-style override drives the want plan directly; the
4017        // thread head is unrelated to what's being fetched.
4018        let advertised = locally_complete_pull_head(&repo, "main", Some(head))
4019            .expect("completeness check");
4020        assert_eq!(advertised, None);
4021    }
4022
4023    #[test]
4024    fn locally_complete_pull_head_skips_repo_with_missing_blobs() {
4025        // A partial/lazy clone records known-missing blobs: it may hold a
4026        // state's metadata while lacking blob content. Advertising such a
4027        // head would make the server omit objects and silently leave us with
4028        // an incomplete repo — so the completeness gate must refuse.
4029        let (_dir, repo) = temp_repo();
4030        let head = put_state_with_blob(&repo, "alpha", vec![]);
4031        repo.refs()
4032            .set_thread(&ThreadName::from("main"), &head)
4033            .expect("set thread");
4034        repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4035            .expect("record missing blob");
4036
4037        let advertised =
4038            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4039        assert_eq!(
4040            advertised, None,
4041            "a repo carrying missing blobs must never advertise a head"
4042        );
4043    }
4044
4045    #[test]
4046    fn locally_complete_pull_head_skips_head_with_incomplete_closure() {
4047        // The thread head's *state* is present, but a parent state in its
4048        // closure is absent. Walking the closure surfaces `ObjectNotFound`,
4049        // so the head must NOT be advertised. This is the dangerous case the
4050        // cardinal correctness constraint guards against.
4051        let (_dir, repo) = temp_repo();
4052        let absent_parent = ChangeId::generate();
4053        let blob = Blob::from("orphan");
4054        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4055        let tree = Tree::from_entries(vec![
4056            TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4057        ]);
4058        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4059        // Child references a parent state that is NOT in the store.
4060        let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4061        let child_id = child.change_id;
4062        repo.store().put_state(&child).expect("put child state");
4063        repo.refs()
4064            .set_thread(&ThreadName::from("main"), &child_id)
4065            .expect("set thread");
4066
4067        let advertised =
4068            locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4069        assert_eq!(
4070            advertised, None,
4071            "a head whose closure has an absent parent state must not be advertised"
4072        );
4073    }
4074
4075    #[test]
4076    fn locally_complete_local_thread_head_advertises_fully_present_thread_head() {
4077        // The explicit `--local-thread` happy path: the named thread's head
4078        // closure is fully local, so it may be advertised (fast delta path).
4079        let (_dir, repo) = temp_repo();
4080        let head = put_state_with_blob(&repo, "alpha", vec![]);
4081        repo.refs()
4082            .set_thread(&ThreadName::from("feature"), &head)
4083            .expect("set thread");
4084
4085        let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4086            .expect("completeness check");
4087        assert_eq!(
4088            advertised,
4089            Some(head),
4090            "an explicit local-thread head whose full closure is present must be advertised"
4091        );
4092    }
4093
4094    #[test]
4095    fn locally_complete_local_thread_head_skips_unknown_thread() {
4096        // `--local-thread` naming a thread with no local head: nothing to
4097        // advertise, falls back to a full closure.
4098        let (_dir, repo) = temp_repo_unseeded();
4099        let advertised = locally_complete_local_thread_head(&repo, "nonexistent", None)
4100            .expect("completeness check");
4101        assert_eq!(advertised, None);
4102    }
4103
4104    #[test]
4105    fn locally_complete_local_thread_head_skips_when_target_state_override_in_play() {
4106        let (_dir, repo) = temp_repo();
4107        let head = put_state_with_blob(&repo, "alpha", vec![]);
4108        repo.refs()
4109            .set_thread(&ThreadName::from("feature"), &head)
4110            .expect("set thread");
4111
4112        // A target-state override drives the want plan directly; the explicit
4113        // thread head is unrelated and must not be advertised.
4114        let advertised = locally_complete_local_thread_head(&repo, "feature", Some(head))
4115            .expect("completeness check");
4116        assert_eq!(advertised, None);
4117    }
4118
4119    #[test]
4120    fn locally_complete_local_thread_head_skips_repo_with_missing_blobs() {
4121        // A partial/lazy clone named via `--local-thread`: holds metadata but
4122        // records known-missing blobs. The gate must refuse.
4123        let (_dir, repo) = temp_repo();
4124        let head = put_state_with_blob(&repo, "alpha", vec![]);
4125        repo.refs()
4126            .set_thread(&ThreadName::from("feature"), &head)
4127            .expect("set thread");
4128        repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4129            .expect("record missing blob");
4130
4131        let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4132            .expect("completeness check");
4133        assert_eq!(
4134            advertised, None,
4135            "a repo carrying missing blobs must never advertise an explicit local-thread head"
4136        );
4137    }
4138
4139    #[test]
4140    fn locally_complete_local_thread_head_skips_head_with_incomplete_closure() {
4141        // The cardinal case for the explicit branch: the named thread's head
4142        // state is present, but a parent state in its closure is absent — an
4143        // interrupted prior pull or partial clone. Advertising this head would
4144        // make the server prune objects we lack. The gate must refuse.
4145        let (_dir, repo) = temp_repo();
4146        let absent_parent = ChangeId::generate();
4147        let blob = Blob::from("orphan");
4148        let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4149        let tree = Tree::from_entries(vec![
4150            TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4151        ]);
4152        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4153        let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4154        let child_id = child.change_id;
4155        repo.store().put_state(&child).expect("put child state");
4156        repo.refs()
4157            .set_thread(&ThreadName::from("feature"), &child_id)
4158            .expect("set thread");
4159
4160        let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4161            .expect("completeness check");
4162        assert_eq!(
4163            advertised, None,
4164            "an explicit local-thread head whose closure has an absent parent must not be advertised"
4165        );
4166    }
4167
4168    /// Mock pull service that mimics the weft server's `exclude_states`
4169    /// contract: it captures the inbound `exclude_states`, fires the
4170    /// zero-delta short-circuit when the remote tip is advertised, and
4171    /// otherwise sends only the objects NOT covered by an advertised
4172    /// (parent) closure.
4173    #[derive(Clone)]
4174    struct DeltaAwarePullService {
4175        remote_state: ChangeId,
4176        /// Object descriptors keyed by the parent state that an advertised
4177        /// `exclude_states` entry would cover. If `exclude_states` contains
4178        /// `remote_state`, the delta is empty (short-circuit). If it contains
4179        /// a known parent, only the child-specific objects are sent. If it
4180        /// contains neither, the full closure is sent.
4181        full_closure: Vec<ObjectInfo>,
4182        delta_objects: Vec<ObjectInfo>,
4183        known_parent: ChangeId,
4184        full_pack: wire::NativePackBundle,
4185        delta_pack: wire::NativePackBundle,
4186        captured_exclude: std::sync::Arc<std::sync::Mutex<Option<Vec<String>>>>,
4187    }
4188
4189    #[tonic::async_trait]
4190    impl RepoSyncService for DeltaAwarePullService {
4191        async fn list_refs(
4192            &self,
4193            _request: tonic::Request<ListRefsRequest>,
4194        ) -> Result<Response<ListRefsResponse>, Status> {
4195            Ok(Response::new(ListRefsResponse::default()))
4196        }
4197
4198        async fn update_ref(
4199            &self,
4200            _request: tonic::Request<UpdateRefRequest>,
4201        ) -> Result<Response<UpdateRefResponse>, Status> {
4202            Ok(Response::new(UpdateRefResponse::default()))
4203        }
4204
4205        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4206
4207        async fn push(
4208            &self,
4209            _request: tonic::Request<tonic::Streaming<PushMessage>>,
4210        ) -> Result<Response<Self::PushStream>, Status> {
4211            let (_tx, rx) = mpsc::channel(1);
4212            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4213                rx,
4214            )))
4215        }
4216
4217        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4218
4219        async fn pull(
4220            &self,
4221            request: tonic::Request<tonic::Streaming<PullMessage>>,
4222        ) -> Result<Response<Self::PullStream>, Status> {
4223            let svc = self.clone();
4224            let (tx, rx) = mpsc::channel(16);
4225
4226            tokio::spawn(async move {
4227                let mut inbound = request.into_inner();
4228                let exclude = match inbound.message().await {
4229                    Ok(Some(PullMessage {
4230                        body: Some(pull_message::Body::Request(req)),
4231                    })) => req.exclude_states,
4232                    other => {
4233                        let _ = tx
4234                            .send(Err(Status::invalid_argument(format!(
4235                                "expected pull request, got {other:?}"
4236                            ))))
4237                            .await;
4238                        return;
4239                    }
4240                };
4241                *svc.captured_exclude.lock().unwrap() = Some(exclude.clone());
4242
4243                let remote_full = svc.remote_state.to_string_full();
4244                let parent_full = svc.known_parent.to_string_full();
4245
4246                // Mirror the server contract: subtract advertised closures.
4247                let (objects, pack, short_circuit) = if exclude.contains(&remote_full) {
4248                    // weft#215 zero-delta short-circuit: client is at the tip.
4249                    (Vec::new(), None, true)
4250                } else if exclude.contains(&parent_full) {
4251                    // Client is behind at `known_parent`; send only the delta.
4252                    (svc.delta_objects.clone(), Some(svc.delta_pack.clone()), false)
4253                } else {
4254                    // No usable advertisement; send the full closure.
4255                    (svc.full_closure.clone(), Some(svc.full_pack.clone()), false)
4256                };
4257
4258                let descriptors: Vec<_> = objects
4259                    .iter()
4260                    .map(|info| {
4261                        object_descriptor_with_status(
4262                            info,
4263                            ObjectAvailabilityStatus::Missing,
4264                            "requested by client",
4265                        )
4266                    })
4267                    .collect();
4268
4269                let ready = PullMessage {
4270                    body: Some(pull_message::Body::Ready(PullReady {
4271                        remote_state: remote_full.clone(),
4272                        objects_to_fetch: descriptors,
4273                        transfer: None,
4274                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
4275                        missing_objects: Vec::new(),
4276                        full_closure_available: false,
4277                        object_count: objects.len() as u32,
4278                        remote_revision_address: native_revision_address(svc.remote_state),
4279                    })),
4280                };
4281                if tx.send(Ok(ready)).await.is_err() {
4282                    return;
4283                }
4284
4285                // Expect the client's Want.
4286                match inbound.message().await {
4287                    Ok(Some(PullMessage {
4288                        body: Some(pull_message::Body::Want(_)),
4289                    })) => {}
4290                    other => {
4291                        let _ = tx
4292                            .send(Err(Status::invalid_argument(format!(
4293                                "expected want, got {other:?}"
4294                            ))))
4295                            .await;
4296                        return;
4297                    }
4298                }
4299
4300                if let Some(pack) = pack
4301                    && !short_circuit
4302                {
4303                    for message in encode_pull_native_pack_messages(&pack, "delta-aware-test", 64) {
4304                        if tx.send(Ok(message)).await.is_err() {
4305                            return;
4306                        }
4307                    }
4308                }
4309
4310                let complete = PullMessage {
4311                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
4312                        success: true,
4313                        new_state: remote_full,
4314                        error: String::new(),
4315                        transfer: Some(TransferCheckpoint {
4316                            transfer_id: "delta-aware-test".to_string(),
4317                            transport_mode: TransportMode::NativePack as i32,
4318                            resume_offset: 0,
4319                            chunk_index: 0,
4320                            checkpoint: b"heddle-markers-v1\n".to_vec(),
4321                            is_complete: true,
4322                        }),
4323                        new_revision_address: native_revision_address(svc.remote_state),
4324                    })),
4325                };
4326                let _ = tx.send(Ok(complete)).await;
4327            });
4328
4329            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4330                rx,
4331            )))
4332        }
4333    }
4334
4335    async fn connect_delta_aware_service(
4336        service: DeltaAwarePullService,
4337    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4338        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4339            Ok(listener) => listener,
4340            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4341                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4342                return None;
4343            }
4344            Err(err) => panic!("bind test server: {err}"),
4345        };
4346        let addr = listener.local_addr().expect("local addr");
4347        let incoming = futures::stream::unfold(listener, |listener| async {
4348            match listener.accept().await {
4349                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4350                Err(err) => Some((Err(err), listener)),
4351            }
4352        });
4353        let handle = tokio::spawn(async move {
4354            Server::builder()
4355                .add_service(RepoSyncServiceServer::new(service))
4356                .serve_with_incoming(incoming)
4357                .await
4358                .expect("serve delta-aware test service");
4359        });
4360        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4361            .await
4362            .expect("connect client");
4363        Some((client, handle))
4364    }
4365
4366    #[tokio::test]
4367    async fn warm_bare_pull_advertises_head_and_fires_zero_delta_short_circuit() {
4368        // Client is exactly at the remote tip. A bare pull must advertise that
4369        // tip as exclude_states; the server then returns an empty delta
4370        // (weft#215 short-circuit) and the client transfers nothing.
4371        let (_dir, repo) = temp_repo();
4372        let head = put_state_with_blob(&repo, "tip", vec![]);
4373        repo.refs()
4374            .set_thread(&ThreadName::from("main"), &head)
4375            .expect("set thread");
4376
4377        let full_closure =
4378            wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
4379        let full_pack =
4380            wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
4381        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4382
4383        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4384            remote_state: head,
4385            full_closure,
4386            delta_objects: Vec::new(),
4387            known_parent: ChangeId::generate(),
4388            full_pack: full_pack.clone(),
4389            delta_pack: full_pack,
4390            captured_exclude: captured.clone(),
4391        })
4392        .await
4393        else {
4394            return;
4395        };
4396
4397        let exchange = tokio::time::timeout(
4398            Duration::from_secs(5),
4399            client.pull_exchange(
4400                &repo,
4401                "owner/repo",
4402                "main",
4403                PullOptions {
4404                    local_thread: None,
4405                    depth: None,
4406                    target_state: None,
4407                    materialization: PullMaterialization::Full,
4408                },
4409            ),
4410        )
4411        .await
4412        .expect("warm bare pull must not hang")
4413        .expect("warm bare pull succeeds");
4414        server.abort();
4415
4416        let advertised = captured.lock().unwrap().clone().expect("request captured");
4417        assert_eq!(
4418            advertised,
4419            vec![head.to_string_full()],
4420            "a bare pull at the tip must advertise that tip in exclude_states"
4421        );
4422        assert!(exchange.result.success);
4423        assert_eq!(
4424            exchange.object_count, 0,
4425            "the zero-delta short-circuit must transfer no objects, not the full closure"
4426        );
4427    }
4428
4429    #[tokio::test]
4430    async fn behind_client_bare_pull_receives_exactly_the_missing_delta() {
4431        // The dangerous direction: client sits at the PARENT state, remote is
4432        // at the CHILD. A bare pull advertises the parent (whose closure the
4433        // client fully holds); the server sends only the child-specific
4434        // objects, and the client must end up with a COMPLETE repo (no object
4435        // it lacked is dropped).
4436        // Build parent + child in the SOURCE repo (the server's view).
4437        let (_src_dir, src_repo) = temp_repo_unseeded();
4438        let parent = put_state_with_blob(&src_repo, "base", vec![]);
4439        let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
4440        let child_closure =
4441            wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
4442
4443        // The CLIENT holds exactly the parent closure (the dangerous "behind"
4444        // setup): copy the parent's objects into a fresh client store and
4445        // track `main` at the parent.
4446        let (_dir, repo) = temp_repo_unseeded();
4447        let parent_closure =
4448            wire::enumerate_state_closure(src_repo.store(), parent).expect("parent closure");
4449        let parent_pack =
4450            wire::build_native_pack(src_repo.store(), &parent_closure).expect("parent pack");
4451        wire::install_received_pack(repo.store(), &parent_pack.pack_data, &parent_pack.index_data)
4452            .expect("install parent closure into client");
4453        repo.refs()
4454            .set_thread(&ThreadName::from("main"), &parent)
4455            .expect("set thread to parent");
4456        // Sanity: the client provably holds the parent's full closure...
4457        wire::enumerate_state_closure(repo.store(), parent).expect("client holds parent closure");
4458        // ...but NOT the child yet.
4459        assert!(
4460            repo.store().get_state(&child).expect("probe child").is_none(),
4461            "client must start without the child state"
4462        );
4463        let parent_clone = parent;
4464        let full_pack =
4465            wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
4466        // Delta = child closure minus the parent closure (what the server
4467        // would send when the parent is advertised).
4468        let delta_objects = wire::enumerate_state_closure_with_options(
4469            src_repo.store(),
4470            child,
4471            wire::StateClosureOptions {
4472                depth: None,
4473                exclude_states: vec![parent_clone],
4474            },
4475        )
4476        .expect("delta closure");
4477        assert!(
4478            !delta_objects.is_empty() && delta_objects.len() < child_closure.len(),
4479            "delta must be a strict, non-empty subset of the full closure"
4480        );
4481        let delta_pack =
4482            wire::build_native_pack(src_repo.store(), &delta_objects).expect("delta pack");
4483
4484        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4485        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4486            remote_state: child,
4487            full_closure: child_closure.clone(),
4488            delta_objects: delta_objects.clone(),
4489            known_parent: parent_clone,
4490            full_pack,
4491            delta_pack,
4492            captured_exclude: captured.clone(),
4493        })
4494        .await
4495        else {
4496            return;
4497        };
4498
4499        let exchange = tokio::time::timeout(
4500            Duration::from_secs(5),
4501            client.pull_exchange(
4502                &repo,
4503                "owner/repo",
4504                "main",
4505                PullOptions {
4506                    local_thread: None,
4507                    depth: None,
4508                    target_state: None,
4509                    materialization: PullMaterialization::Full,
4510                },
4511            ),
4512        )
4513        .await
4514        .expect("behind-client pull must not hang")
4515        .expect("behind-client pull succeeds");
4516        server.abort();
4517
4518        let advertised = captured.lock().unwrap().clone().expect("request captured");
4519        assert_eq!(
4520            advertised,
4521            vec![parent.to_string_full()],
4522            "a behind client must advertise the parent head it holds"
4523        );
4524        assert!(exchange.result.success);
4525        // The client must now hold the COMPLETE child closure — every object
4526        // in the remote's full closure must be present locally, proving the
4527        // server-side parent-pruning dropped nothing the client lacked.
4528        let reassembled = wire::enumerate_state_closure(repo.store(), child)
4529            .expect("the client must hold the complete child closure after a delta pull");
4530        assert_eq!(
4531            reassembled.len(),
4532            child_closure.len(),
4533            "delta pull must leave the client with the full child closure, no gaps"
4534        );
4535    }
4536
4537    #[tokio::test]
4538    async fn fresh_bare_pull_advertises_nothing_and_gets_full_closure() {
4539        // Unseeded local repo (the `heddle clone` shape): no local head for
4540        // the thread, so nothing is advertised and the server sends the full
4541        // closure (today's behavior).
4542        let (_dir, repo) = temp_repo_unseeded();
4543        let (_src_dir, src_repo) = temp_repo_unseeded();
4544        let remote = put_state_with_blob(&src_repo, "seed", vec![]);
4545        let full_closure =
4546            wire::enumerate_state_closure(src_repo.store(), remote).expect("closure");
4547        let full_pack =
4548            wire::build_native_pack(src_repo.store(), &full_closure).expect("full pack");
4549
4550        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4551        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4552            remote_state: remote,
4553            full_closure: full_closure.clone(),
4554            delta_objects: Vec::new(),
4555            known_parent: ChangeId::generate(),
4556            full_pack: full_pack.clone(),
4557            delta_pack: full_pack,
4558            captured_exclude: captured.clone(),
4559        })
4560        .await
4561        else {
4562            return;
4563        };
4564
4565        let exchange = tokio::time::timeout(
4566            Duration::from_secs(5),
4567            client.pull_exchange(
4568                &repo,
4569                "owner/repo",
4570                "main",
4571                PullOptions {
4572                    local_thread: None,
4573                    depth: None,
4574                    target_state: None,
4575                    materialization: PullMaterialization::Full,
4576                },
4577            ),
4578        )
4579        .await
4580        .expect("fresh bare pull must not hang")
4581        .expect("fresh bare pull succeeds");
4582        server.abort();
4583
4584        let advertised = captured.lock().unwrap().clone().expect("request captured");
4585        assert!(
4586            advertised.is_empty(),
4587            "a fresh repo must advertise no exclude_states"
4588        );
4589        assert!(exchange.result.success);
4590        assert_eq!(
4591            exchange.object_count,
4592            full_closure.len(),
4593            "a fresh pull must receive the full closure, nothing wrongly excluded"
4594        );
4595        assert!(
4596            wire::enumerate_state_closure(repo.store(), remote).is_ok(),
4597            "fresh pull must install the complete closure"
4598        );
4599    }
4600
4601    #[tokio::test]
4602    async fn explicit_local_thread_incomplete_closure_does_not_advertise_and_repairs() {
4603        // The footgun this PR closes: a user passes `--local-thread` against a
4604        // repo whose named thread head has an INCOMPLETE closure (an absent
4605        // parent state — a partial clone or interrupted prior pull). The
4606        // explicit branch must NOT advertise it (that would make the server
4607        // prune objects the client lacks and silently leave it corrupt). The
4608        // completeness gate refuses, the pull falls back to the full closure,
4609        // and the client ends COMPLETE.
4610        //
4611        // Build parent + child in the SOURCE repo (the server's view).
4612        let (_src_dir, src_repo) = temp_repo_unseeded();
4613        let parent = put_state_with_blob(&src_repo, "base", vec![]);
4614        let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
4615        let child_closure =
4616            wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
4617        let full_pack =
4618            wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
4619
4620        // The CLIENT's `feature` thread points at `child`, but the client only
4621        // holds child's own metadata, NOT the parent closure — an incomplete
4622        // local closure. Copy just the child-specific objects (closure minus
4623        // parent) so the parent state is genuinely absent.
4624        let (_dir, repo) = temp_repo_unseeded();
4625        let child_only = wire::enumerate_state_closure_with_options(
4626            src_repo.store(),
4627            child,
4628            wire::StateClosureOptions {
4629                depth: None,
4630                exclude_states: vec![parent],
4631            },
4632        )
4633        .expect("child-only objects");
4634        let child_only_pack =
4635            wire::build_native_pack(src_repo.store(), &child_only).expect("child-only pack");
4636        wire::install_received_pack(
4637            repo.store(),
4638            &child_only_pack.pack_data,
4639            &child_only_pack.index_data,
4640        )
4641        .expect("install child-only objects");
4642        repo.refs()
4643            .set_thread(&ThreadName::from("feature"), &child)
4644            .expect("set local thread to child");
4645        // Sanity: the named thread's closure is INCOMPLETE locally (parent
4646        // state absent), exactly the dangerous over-advertise setup.
4647        assert!(
4648            matches!(
4649                wire::enumerate_state_closure(repo.store(), child),
4650                Err(ProtocolError::ObjectNotFound(_))
4651            ),
4652            "the explicit thread head's closure must start incomplete"
4653        );
4654
4655        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4656        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4657            remote_state: child,
4658            full_closure: child_closure.clone(),
4659            delta_objects: Vec::new(),
4660            known_parent: parent,
4661            full_pack: full_pack.clone(),
4662            delta_pack: full_pack,
4663            captured_exclude: captured.clone(),
4664        })
4665        .await
4666        else {
4667            return;
4668        };
4669
4670        let exchange = tokio::time::timeout(
4671            Duration::from_secs(5),
4672            client.pull_exchange(
4673                &repo,
4674                "owner/repo",
4675                "main",
4676                PullOptions {
4677                    local_thread: Some("feature"),
4678                    depth: None,
4679                    target_state: None,
4680                    materialization: PullMaterialization::Full,
4681                },
4682            ),
4683        )
4684        .await
4685        .expect("explicit-thread pull must not hang")
4686        .expect("explicit-thread pull succeeds");
4687        server.abort();
4688
4689        let advertised = captured.lock().unwrap().clone().expect("request captured");
4690        assert!(
4691            advertised.is_empty(),
4692            "an explicit --local-thread with an incomplete closure must advertise nothing, \
4693             falling back to a full pull (got {advertised:?})"
4694        );
4695        assert!(exchange.result.success);
4696        // The repair: the client must now hold the COMPLETE child closure.
4697        let reassembled = wire::enumerate_state_closure(repo.store(), child)
4698            .expect("the client must hold the complete child closure after the fallback full pull");
4699        assert_eq!(
4700            reassembled.len(),
4701            child_closure.len(),
4702            "the full-pull fallback must leave the client with the complete closure, no gaps"
4703        );
4704    }
4705
4706    #[tokio::test]
4707    async fn explicit_local_thread_complete_closure_advertises_and_fires_short_circuit() {
4708        // Fast path preserved: an explicit `--local-thread` whose head closure
4709        // IS fully local must still advertise that head, so the server can
4710        // prune to the delta (here the zero-delta short-circuit, since the
4711        // client is at the remote tip).
4712        let (_dir, repo) = temp_repo();
4713        let head = put_state_with_blob(&repo, "tip", vec![]);
4714        repo.refs()
4715            .set_thread(&ThreadName::from("feature"), &head)
4716            .expect("set local thread");
4717        // Sanity: the named thread's closure is fully present locally.
4718        wire::enumerate_state_closure(repo.store(), head)
4719            .expect("client holds the complete thread closure");
4720
4721        let full_closure =
4722            wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
4723        let full_pack =
4724            wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
4725        let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4726
4727        let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4728            remote_state: head,
4729            full_closure,
4730            delta_objects: Vec::new(),
4731            known_parent: ChangeId::generate(),
4732            full_pack: full_pack.clone(),
4733            delta_pack: full_pack,
4734            captured_exclude: captured.clone(),
4735        })
4736        .await
4737        else {
4738            return;
4739        };
4740
4741        let exchange = tokio::time::timeout(
4742            Duration::from_secs(5),
4743            client.pull_exchange(
4744                &repo,
4745                "owner/repo",
4746                "main",
4747                PullOptions {
4748                    local_thread: Some("feature"),
4749                    depth: None,
4750                    target_state: None,
4751                    materialization: PullMaterialization::Full,
4752                },
4753            ),
4754        )
4755        .await
4756        .expect("explicit-thread pull must not hang")
4757        .expect("explicit-thread pull succeeds");
4758        server.abort();
4759
4760        let advertised = captured.lock().unwrap().clone().expect("request captured");
4761        assert_eq!(
4762            advertised,
4763            vec![head.to_string_full()],
4764            "an explicit --local-thread with a complete closure must advertise its head"
4765        );
4766        assert!(exchange.result.success);
4767        assert_eq!(
4768            exchange.object_count, 0,
4769            "advertising the complete head must fire the zero-delta short-circuit, not a full pull"
4770        );
4771    }
4772}