Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7    GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9    RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
10    ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
11    pull_message, push_message,
12};
13use objects::{
14    object::{ChangeId, ContentHash, MarkerName, ThreadName},
15    store::{ObjectStore, PackObjectId},
16};
17use wire::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
18use repo::{Repository, SyncedThreadMetadata, ThreadManager};
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::Request;
22
23use super::{
24    HostedGrpcClient, PullMaterialization,
25    helpers::{
26        descriptor_id, descriptor_id_from_info, object_descriptor_with_status,
27        parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
28        transport_mode_name,
29    },
30};
31
32#[derive(Clone, Copy)]
33struct PullOptions<'a> {
34    local_thread: Option<&'a str>,
35    depth: Option<u32>,
36    target_state: Option<ChangeId>,
37    materialization: PullMaterialization,
38}
39
40struct PullWantPlan {
41    wants: Vec<ObjectDescriptor>,
42    wanted_types: WantedTypes,
43    want_full_closure: bool,
44}
45
46type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
47
48#[derive(Debug, Clone, Default)]
49pub struct PullObjectMix {
50    pub blobs: usize,
51    pub trees: usize,
52    pub states: usize,
53    pub actions: usize,
54    pub redactions: usize,
55    pub state_visibilities: usize,
56}
57
58impl PullObjectMix {
59    fn record(&mut self, obj_type: ObjectType) {
60        match obj_type {
61            ObjectType::Blob => self.blobs += 1,
62            ObjectType::Tree => self.trees += 1,
63            ObjectType::State => self.states += 1,
64            ObjectType::Action => self.actions += 1,
65            ObjectType::Redaction => self.redactions += 1,
66            ObjectType::StateVisibility => self.state_visibilities += 1,
67        }
68    }
69
70    pub fn total(&self) -> usize {
71        self.blobs
72            + self.trees
73            + self.states
74            + self.actions
75            + self.redactions
76            + self.state_visibilities
77    }
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct PullProfile {
82    pub ready_wait: Duration,
83    pub receive_and_apply: Duration,
84    pub decode: Duration,
85    pub store_receive_object: Duration,
86    pub metadata_sync: Duration,
87    pub pack_decode_apply: Duration,
88    pub raw_decode_apply: Duration,
89    pub pack_decode: Duration,
90    pub raw_decode: Duration,
91    pub bytes_received: usize,
92    pub pack_bytes_received: usize,
93    pub raw_bytes_received: usize,
94    pub objects_received: usize,
95    pub object_mix: PullObjectMix,
96}
97
98impl HostedGrpcClient {
99    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
100        let mut request = Request::new(ListRefsRequest {
101            repo_path: repo_path.to_string(),
102        });
103        self.apply_auth(&mut request)?;
104        let response = self
105            .inner
106            .list_refs(request)
107            .await
108            .map_err(status_to_protocol_error)?
109            .into_inner();
110        response
111            .refs
112            .into_iter()
113            .map(|entry| {
114                Ok(RefEntry {
115                    name: entry.name,
116                    change_id: ChangeId::try_from_slice(&entry.change_id)
117                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
118                    is_thread: entry.is_thread,
119                })
120            })
121            .collect()
122    }
123
124    #[allow(clippy::too_many_arguments)]
125    pub async fn update_ref(
126        &mut self,
127        repo_path: &str,
128        name: &str,
129        is_thread: bool,
130        old_value: Option<ChangeId>,
131        new_value: ChangeId,
132        force: bool,
133        thread_metadata: Option<&SyncedThreadMetadata>,
134    ) -> Result<RefUpdated, ProtocolError> {
135        let mut request = Request::new(UpdateRefRequest {
136            repo_path: repo_path.to_string(),
137            name: name.to_string(),
138            is_thread,
139            force,
140            old_value: old_value
141                .map(|value| value.to_string_full())
142                .unwrap_or_default(),
143            new_value: new_value.to_string_full(),
144            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
145            client_operation_id: String::new(),
146        });
147        self.apply_auth(&mut request)?;
148        let response = self
149            .inner
150            .update_ref(request)
151            .await
152            .map_err(status_to_protocol_error)?
153            .into_inner();
154        Ok(RefUpdated {
155            success: response.success,
156            old_value: if response.old_value.is_empty() {
157                None
158            } else {
159                Some(
160                    ChangeId::parse(&response.old_value)
161                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
162                )
163            },
164            error: (!response.error.is_empty()).then_some(response.error),
165        })
166    }
167
168    pub async fn push(
169        &mut self,
170        repo: &Repository,
171        repo_path: &str,
172        local_state: ChangeId,
173        target_thread: &str,
174        force: bool,
175    ) -> Result<PushComplete, ProtocolError> {
176        let _ = self.transport.chunk_size;
177        let _ = self.transport.resume_attempts;
178        let objects = wire::enumerate_state_closure(repo.store(), local_state)?;
179        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
180        let transport_mode = preferred_transport_mode(&self.transport, objects.len());
181        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
182        let request_message = PushMessage {
183            body: Some(push_message::Body::Request(PushRequest {
184                repo_path: repo_path.to_string(),
185                local_state: local_state.to_string_full(),
186                target_thread: target_thread.to_string(),
187                create_thread: true,
188                force,
189                objects: objects.iter().map(to_proto_object_info).collect(),
190                transfer: Some(self.transport.transfer_checkpoint_with_mode(
191                    transfer_id.clone(),
192                    transport_mode,
193                    0,
194                    0,
195                    false,
196                )),
197                partial_fetch_status: partial_fetch_status_for_repo(repo),
198                allow_partial_fetch: true,
199                thread_metadata: thread_metadata
200                    .map(|metadata| to_proto_thread_metadata(&metadata)),
201                client_operation_id: String::new(),
202            })),
203        };
204
205        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
206        tx.send(request_message).await.map_err(|_| {
207            ProtocolError::InvalidState("failed to initialize push stream".to_string())
208        })?;
209        let mut request = Request::new(ReceiverStream::new(rx));
210        self.apply_auth(&mut request)?;
211        let mut response = self
212            .inner
213            .push(request)
214            .await
215            .map_err(status_to_protocol_error)?
216            .into_inner();
217
218        let ready = match response.message().await.map_err(status_to_protocol_error)? {
219            Some(PushMessage {
220                body: Some(push_message::Body::Ready(ready)),
221            }) => ready,
222            _ => {
223                return Err(ProtocolError::InvalidState(
224                    "expected PushReady from gRPC server".to_string(),
225                ));
226            }
227        };
228
229        let object_index = objects
230            .into_iter()
231            .map(|info| (descriptor_id_from_info(&info), info))
232            .collect::<HashMap<_, _>>();
233
234        let ready_transport_mode = ready
235            .transfer
236            .as_ref()
237            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
238            .unwrap_or(transport_mode);
239        let wanted_infos = ready
240            .want_objects
241            .into_iter()
242            .map(|want| {
243                object_index
244                    .get(&descriptor_id(&want))
245                    .cloned()
246                    .ok_or_else(|| {
247                        ProtocolError::InvalidState("server requested unknown object".to_string())
248                    })
249            })
250            .collect::<Result<Vec<_>, _>>()?;
251
252        // Split want_objects: blob/tree/state/action → native pack;
253        // sidecars → out-of-pack transfer channels. Sidecars live outside
254        // `.heddle/objects/` so GC can't reach them and they can't ride the
255        // pack — `build_native_pack` skips the same object-type set.
256        let (wanted_sidecars, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
257            .into_iter()
258            .partition(|info| is_out_of_pack_transfer_object_type(info.obj_type));
259
260        if !wanted_packable.is_empty() {
261            let bundle = wire::build_native_pack(repo.store(), &wanted_packable)?;
262            for message in encode_native_pack_messages(
263                &bundle,
264                &transfer_id,
265                self.transport.chunk_size.max(1),
266                &self.transport,
267                ready_transport_mode,
268            )? {
269                tx.send(message).await.map_err(|_| {
270                    ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
271                })?;
272            }
273        }
274
275        for info in wanted_sidecars {
276            let message = sidecar_push_message(repo, info)?;
277            tx.send(message).await.map_err(|_| {
278                ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
279            })?;
280        }
281        drop(tx);
282
283        let result = match response.message().await.map_err(status_to_protocol_error)? {
284            Some(PushMessage {
285                body: Some(push_message::Body::Complete(complete)),
286            }) => PushComplete {
287                success: complete.success,
288                new_state: if complete.new_state.is_empty() {
289                    None
290                } else {
291                    Some(
292                        ChangeId::parse(&complete.new_state)
293                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
294                    )
295                },
296                error: (!complete.error.is_empty()).then_some(complete.error),
297                transfer_id: complete
298                    .transfer
299                    .as_ref()
300                    .map(|transfer| transfer.transfer_id.clone())
301                    .unwrap_or_default(),
302                transport_mode: complete
303                    .transfer
304                    .as_ref()
305                    .map(|transfer| transport_mode_name(transfer.transport_mode))
306                    .unwrap_or("raw-objects")
307                    .to_string(),
308                resume_offset: complete
309                    .transfer
310                    .as_ref()
311                    .map(|transfer| transfer.resume_offset)
312                    .unwrap_or_default(),
313                chunk_index: complete
314                    .transfer
315                    .as_ref()
316                    .map(|transfer| transfer.chunk_index)
317                    .unwrap_or_default(),
318                checkpoint: complete
319                    .transfer
320                    .as_ref()
321                    .map(|transfer| transfer.checkpoint.clone())
322                    .unwrap_or_default(),
323                is_complete: complete
324                    .transfer
325                    .as_ref()
326                    .map(|transfer| transfer.is_complete)
327                    .unwrap_or(false),
328            },
329            _ => {
330                return Err(ProtocolError::InvalidState(
331                    "expected PushComplete from gRPC server".to_string(),
332                ));
333            }
334        };
335
336        if result.success {
337            self.sync_remote_markers(repo, repo_path, local_state)
338                .await?;
339        }
340        Ok(result)
341    }
342
343    pub async fn pull(
344        &mut self,
345        repo: &Repository,
346        repo_path: &str,
347        remote_thread: &str,
348        local_thread: Option<&str>,
349    ) -> Result<PullComplete, ProtocolError> {
350        self.pull_with_options(
351            repo,
352            repo_path,
353            remote_thread,
354            PullOptions {
355                local_thread,
356                depth: None,
357                target_state: None,
358                materialization: PullMaterialization::Full,
359            },
360        )
361        .await
362    }
363
364    pub async fn pull_profiled(
365        &mut self,
366        repo: &Repository,
367        repo_path: &str,
368        remote_thread: &str,
369        local_thread: Option<&str>,
370    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
371        self.pull_exchange(
372            repo,
373            repo_path,
374            remote_thread,
375            PullOptions {
376                local_thread,
377                depth: None,
378                target_state: None,
379                materialization: PullMaterialization::Full,
380            },
381        )
382        .await
383        .map(|exchange| (exchange.result, exchange.profile))
384    }
385
386    pub async fn pull_partial(
387        &mut self,
388        repo: &Repository,
389        repo_path: &str,
390        remote_thread: &str,
391        local_thread: Option<&str>,
392    ) -> Result<PullComplete, ProtocolError> {
393        self.pull_with_options(
394            repo,
395            repo_path,
396            remote_thread,
397            PullOptions {
398                local_thread,
399                depth: None,
400                target_state: None,
401                materialization: PullMaterialization::Lazy,
402            },
403        )
404        .await
405    }
406
407    pub async fn pull_with_depth_and_materialization(
408        &mut self,
409        repo: &Repository,
410        repo_path: &str,
411        remote_thread: &str,
412        local_thread: Option<&str>,
413        depth: Option<u32>,
414        materialization: PullMaterialization,
415    ) -> Result<PullComplete, ProtocolError> {
416        self.pull_with_options(
417            repo,
418            repo_path,
419            remote_thread,
420            PullOptions {
421                local_thread,
422                depth,
423                target_state: None,
424                materialization,
425            },
426        )
427        .await
428    }
429
430    pub async fn pull_with_depth(
431        &mut self,
432        repo: &Repository,
433        repo_path: &str,
434        remote_thread: &str,
435        local_thread: Option<&str>,
436        depth: Option<u32>,
437    ) -> Result<PullComplete, ProtocolError> {
438        self.pull_with_depth_and_materialization(
439            repo,
440            repo_path,
441            remote_thread,
442            local_thread,
443            depth,
444            PullMaterialization::Full,
445        )
446        .await
447    }
448
449    pub async fn fetch_state(
450        &mut self,
451        repo: &Repository,
452        repo_path: &str,
453        remote_thread: &str,
454        target_state: ChangeId,
455    ) -> Result<usize, ProtocolError> {
456        self.pull_exchange(
457            repo,
458            repo_path,
459            remote_thread,
460            PullOptions {
461                local_thread: None,
462                depth: None,
463                target_state: Some(target_state),
464                materialization: PullMaterialization::Full,
465            },
466        )
467        .await
468        .map(|exchange| exchange.object_count)
469    }
470
471    pub async fn fetch_state_partial(
472        &mut self,
473        repo: &Repository,
474        repo_path: &str,
475        remote_thread: &str,
476        target_state: ChangeId,
477    ) -> Result<usize, ProtocolError> {
478        self.pull_exchange(
479            repo,
480            repo_path,
481            remote_thread,
482            PullOptions {
483                local_thread: None,
484                depth: None,
485                target_state: Some(target_state),
486                materialization: PullMaterialization::Lazy,
487            },
488        )
489        .await
490        .map(|exchange| exchange.object_count)
491    }
492
493    pub async fn hydrate_blob_at_path(
494        &mut self,
495        repo: &Repository,
496        repo_path: &str,
497        reference: &str,
498        path: &str,
499    ) -> Result<objects::object::Blob, ProtocolError> {
500        let mut request = Request::new(GetBlobRequest {
501            repo_path: repo_path.to_string(),
502            r#ref: reference.to_string(),
503            path: path.to_string(),
504        });
505        self.apply_auth(&mut request)?;
506        let response = self
507            .content
508            .get_blob(request)
509            .await
510            .map_err(status_to_protocol_error)?
511            .into_inner();
512
513        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
514        let blob = objects::object::Blob::new(content);
515        repo.store().put_blob(&blob)?;
516        repo.clear_missing_blob(&blob.hash())?;
517        Ok(blob)
518    }
519
520    pub async fn hydrate_missing_blobs_for_state(
521        &mut self,
522        repo: &Repository,
523        repo_path: &str,
524        remote_thread: &str,
525        target_state: ChangeId,
526    ) -> Result<usize, ProtocolError> {
527        let exchange = self
528            .pull_exchange(
529                repo,
530                repo_path,
531                remote_thread,
532                PullOptions {
533                    local_thread: None,
534                    depth: None,
535                    target_state: Some(target_state),
536                    materialization: PullMaterialization::Full,
537                },
538            )
539            .await?;
540        clear_missing_blobs_for_state(repo, target_state)?;
541        Ok(exchange.object_count)
542    }
543
544    async fn pull_with_options(
545        &mut self,
546        repo: &Repository,
547        repo_path: &str,
548        remote_thread: &str,
549        options: PullOptions<'_>,
550    ) -> Result<PullComplete, ProtocolError> {
551        self.pull_exchange(repo, repo_path, remote_thread, options)
552            .await
553            .map(|exchange| exchange.result)
554    }
555
556    async fn pull_exchange(
557        &mut self,
558        repo: &Repository,
559        repo_path: &str,
560        remote_thread: &str,
561        options: PullOptions<'_>,
562    ) -> Result<PullExchange, ProtocolError> {
563        let exchange_start = Instant::now();
564        let mut exclude_states = Vec::new();
565        if let Some(local_thread) = options.local_thread
566            && let Some(head) = repo.refs().get_thread(&ThreadName::from(local_thread))?
567        {
568            exclude_states.push(head);
569        }
570        let allow_partial_fetch = options.materialization.allows_partial_fetch();
571        let fresh_full_pull =
572            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
573
574        let transfer_id = pull_transfer_id(
575            repo_path,
576            remote_thread,
577            options.local_thread,
578            options.depth,
579            options.target_state,
580        );
581        let request_message = PullMessage {
582            body: Some(pull_message::Body::Request(PullRequest {
583                repo_path: repo_path.to_string(),
584                remote_thread: remote_thread.to_string(),
585                local_thread: options.local_thread.unwrap_or_default().to_string(),
586                target_state: options
587                    .target_state
588                    .map(|value| value.to_string_full())
589                    .unwrap_or_default(),
590                depth: options.depth.unwrap_or_default(),
591                exclude_states: exclude_states
592                    .iter()
593                    .map(ChangeId::to_string_full)
594                    .collect(),
595                transfer: Some(self.transport.transfer_checkpoint_with_mode(
596                    transfer_id.clone(),
597                    TransportMode::NativePack,
598                    0,
599                    0,
600                    false,
601                )),
602                partial_fetch_status: partial_fetch_status_for_repo(repo),
603                allow_partial_fetch,
604                fresh_full_pull,
605                client_operation_id: String::new(),
606            })),
607        };
608
609        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
610        tx.send(request_message).await.map_err(|_| {
611            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
612        })?;
613        let mut request = Request::new(ReceiverStream::new(rx));
614        self.apply_auth(&mut request)?;
615        let mut response = self
616            .inner
617            .pull(request)
618            .await
619            .map_err(status_to_protocol_error)?
620            .into_inner();
621
622        let ready = match response.message().await.map_err(status_to_protocol_error)? {
623            Some(PullMessage {
624                body: Some(pull_message::Body::Ready(ready)),
625            }) => ready,
626            _ => {
627                return Err(ProtocolError::InvalidState(
628                    "expected PullReady from gRPC server".to_string(),
629                ));
630            }
631        };
632        let mut profile = PullProfile {
633            ready_wait: exchange_start.elapsed(),
634            ..PullProfile::default()
635        };
636        let remote_state = ChangeId::parse(&ready.remote_state)
637            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
638        let PullWantPlan {
639            wants,
640            wanted_types,
641            want_full_closure,
642        } = plan_pull_wants(
643            repo,
644            &remote_state,
645            ready.full_closure_available,
646            ready.objects_to_fetch,
647            allow_partial_fetch,
648        )?;
649        let native_pack_required = native_pack_required_for_pull(want_full_closure, &wanted_types);
650
651        tx.send(PullMessage {
652            body: Some(pull_message::Body::Want(WantObjects {
653                objects: wants.clone(),
654                want_full_closure,
655                transfer: Some(self.transport.transfer_checkpoint_with_mode(
656                    transfer_id.clone(),
657                    TransportMode::NativePack,
658                    0,
659                    0,
660                    false,
661                )),
662            })),
663        })
664        .await
665        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
666        drop(tx);
667
668        let receive_start = Instant::now();
669        let mut pack_state = wire::PackChunkState::default();
670        let mut received = 0usize;
671        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
672            match message.body {
673                Some(pull_message::Body::Pack(chunk)) => {
674                    profile.bytes_received =
675                        profile.bytes_received.saturating_add(chunk.data.len());
676                    profile.pack_bytes_received =
677                        profile.pack_bytes_received.saturating_add(chunk.data.len());
678                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
679                        ProtocolError::InvalidState(
680                            "native pack chunk missing transfer checkpoint".to_string(),
681                        )
682                    })?;
683                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
684                        .unwrap_or(PackStreamKind::Unspecified);
685                    if stream_kind == PackStreamKind::Unspecified {
686                        return Err(ProtocolError::InvalidState(
687                            "native pack chunk missing stream kind".to_string(),
688                        ));
689                    }
690                    let decode_start = Instant::now();
691                    wire::receive_pack_chunk(
692                        &mut pack_state,
693                        stream_kind == PackStreamKind::Index,
694                        transfer.resume_offset,
695                        transfer.chunk_index,
696                        transfer.is_complete,
697                        &chunk.data,
698                        chunk.is_final_chunk,
699                    )?;
700                    let decode_elapsed = decode_start.elapsed();
701                    profile.pack_decode += decode_elapsed;
702                    profile.pack_decode_apply += decode_elapsed;
703                    profile.decode += decode_elapsed;
704                }
705                Some(pull_message::Body::Redaction(transfer)) => {
706                    // Out-of-pack channel: receive a redaction sidecar
707                    // and route through `Repository::accept_wire_redactions`
708                    // for signature + trust-list verification. The
709                    // server emitted these only for blobs in our want
710                    // set that carry an active redaction.
711                    wire::check_received_transfer_blob_size(
712                        transfer.redactions_blob.len(),
713                        wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
714                        "redactions",
715                    )?;
716                    profile.bytes_received = profile
717                        .bytes_received
718                        .saturating_add(transfer.redactions_blob.len());
719                    profile.object_mix.record(ObjectType::Redaction);
720                    let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
721                        ProtocolError::InvalidState(format!(
722                            "RedactionTransfer.blob_hash is not a valid content hash: {err}"
723                        ))
724                    })?;
725                    let decode_start = Instant::now();
726                    repo.accept_wire_redactions(blob, &transfer.redactions_blob)
727                        .map_err(|err| {
728                            ProtocolError::InvalidState(format!(
729                                "accept_wire_redactions for blob {}: {err}",
730                                transfer.blob_hash
731                            ))
732                        })?;
733                    let decode_elapsed = decode_start.elapsed();
734                    profile.store_receive_object += decode_elapsed;
735                }
736                Some(pull_message::Body::StateVisibility(transfer)) => {
737                    wire::check_received_transfer_blob_size(
738                        transfer.state_visibility_blob.len(),
739                        wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
740                        "state-visibility",
741                    )?;
742                    profile.bytes_received = profile
743                        .bytes_received
744                        .saturating_add(transfer.state_visibility_blob.len());
745                    profile.object_mix.record(ObjectType::StateVisibility);
746                    let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
747                        ProtocolError::InvalidState(format!(
748                            "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
749                        ))
750                    })?;
751                    let decode_start = Instant::now();
752                    repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
753                        .map_err(|err| {
754                            ProtocolError::InvalidState(format!(
755                                "accept_wire_state_visibility for state {}: {err}",
756                                transfer.state_id
757                            ))
758                        })?;
759                    let decode_elapsed = decode_start.elapsed();
760                    profile.store_receive_object += decode_elapsed;
761                }
762                Some(pull_message::Body::Complete(complete)) => {
763                    profile.receive_and_apply = receive_start.elapsed();
764                    let final_state = if complete.new_state.is_empty() {
765                        None
766                    } else {
767                        Some(
768                            ChangeId::parse(&complete.new_state)
769                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
770                        )
771                    };
772
773                    if complete.success {
774                        if native_pack_required {
775                            if !pack_state.is_complete() {
776                                return Err(ProtocolError::InvalidState(
777                                    "pull completed before native pack stream finished".to_string(),
778                                ));
779                            }
780                            let store_start = Instant::now();
781                            let installed_ids = wire::install_received_pack(
782                                repo.store(),
783                                &pack_state.pack_data,
784                                &pack_state.index_data,
785                            )?;
786                            profile.store_receive_object += store_start.elapsed();
787                            received = installed_ids.len();
788                            for id in installed_ids {
789                                match (id, wanted_packable_type(&wanted_types, &id)) {
790                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
791                                        profile.object_mix.record(ObjectType::Blob);
792                                        repo.clear_missing_blob(&hash)?;
793                                    }
794                                    (_, Some(obj_type)) => {
795                                        profile.object_mix.record(obj_type);
796                                    }
797                                    (PackObjectId::ChangeId(_), None) => {
798                                        profile.object_mix.record(ObjectType::State);
799                                    }
800                                    (PackObjectId::Hash(hash), None) => {
801                                        let inferred =
802                                            infer_installed_hash_object_type(repo, &hash)?;
803                                        profile.object_mix.record(inferred);
804                                    }
805                                }
806                            }
807                        }
808
809                        let metadata_start = Instant::now();
810                        if let Some(local_thread) = options.local_thread
811                            && let Some(state) = final_state
812                        {
813                            repo.refs()
814                                .set_thread(&ThreadName::from(local_thread), &state)?;
815                        }
816                        if let Some(state) = final_state
817                            && allow_partial_fetch
818                        {
819                            mark_missing_blobs_for_state(repo, state)?;
820                        } else if final_state.is_some() {
821                            let _ = repo.clear_all_missing_blobs()?;
822                        }
823                        let synced_markers = complete
824                            .transfer
825                            .as_ref()
826                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
827                            .transpose()?
828                            .unwrap_or(false);
829                        if !synced_markers {
830                            self.sync_local_markers(repo, repo_path).await?;
831                        }
832                        profile.metadata_sync = metadata_start.elapsed();
833                        profile.objects_received = received;
834                        return Ok(PullExchange {
835                            result: PullComplete {
836                                success: true,
837                                final_state,
838                                error: None,
839                                transfer_id: complete
840                                    .transfer
841                                    .as_ref()
842                                    .map(|transfer| transfer.transfer_id.clone())
843                                    .unwrap_or_default(),
844                                transport_mode: complete
845                                    .transfer
846                                    .as_ref()
847                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
848                                    .unwrap_or("native-pack")
849                                    .to_string(),
850                                resume_offset: complete
851                                    .transfer
852                                    .as_ref()
853                                    .map(|transfer| transfer.resume_offset)
854                                    .unwrap_or_default(),
855                                chunk_index: complete
856                                    .transfer
857                                    .as_ref()
858                                    .map(|transfer| transfer.chunk_index)
859                                    .unwrap_or_default(),
860                                checkpoint: complete
861                                    .transfer
862                                    .as_ref()
863                                    .map(|transfer| transfer.checkpoint.clone())
864                                    .unwrap_or_default(),
865                                is_complete: complete
866                                    .transfer
867                                    .as_ref()
868                                    .map(|transfer| transfer.is_complete)
869                                    .unwrap_or(false),
870                            },
871                            object_count: received,
872                            profile,
873                        });
874                    }
875
876                    profile.objects_received = received;
877                    return Ok(PullExchange {
878                        result: PullComplete {
879                            success: false,
880                            final_state,
881                            error: (!complete.error.is_empty()).then_some(complete.error),
882                            transfer_id: complete
883                                .transfer
884                                .as_ref()
885                                .map(|transfer| transfer.transfer_id.clone())
886                                .unwrap_or_default(),
887                            transport_mode: complete
888                                .transfer
889                                .as_ref()
890                                .map(|transfer| transport_mode_name(transfer.transport_mode))
891                                .unwrap_or("native-pack")
892                                .to_string(),
893                            resume_offset: complete
894                                .transfer
895                                .as_ref()
896                                .map(|transfer| transfer.resume_offset)
897                                .unwrap_or_default(),
898                            chunk_index: complete
899                                .transfer
900                                .as_ref()
901                                .map(|transfer| transfer.chunk_index)
902                                .unwrap_or_default(),
903                            checkpoint: complete
904                                .transfer
905                                .as_ref()
906                                .map(|transfer| transfer.checkpoint.clone())
907                                .unwrap_or_default(),
908                            is_complete: complete
909                                .transfer
910                                .as_ref()
911                                .map(|transfer| transfer.is_complete)
912                                .unwrap_or(false),
913                        },
914                        object_count: received,
915                        profile,
916                    });
917                }
918                _ => {}
919            }
920        }
921
922        Err(ProtocolError::InvalidState(format!(
923            "pull stream ended unexpectedly after receiving {received} packed objects"
924        )))
925    }
926}
927
928fn redaction_push_message(
929    repo: &Repository,
930    info: wire::ObjectInfo,
931) -> Result<PushMessage, ProtocolError> {
932    let wire::ObjectId::Hash(blob) = info.id else {
933        return Err(ProtocolError::InvalidState(
934            "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
935        ));
936    };
937    let hex = blob.to_hex();
938    // Sender-side: load the byte-identical sidecar payload
939    // that `Repository::put_redaction` wrote to disk. The
940    // receiver verifies the signature + trust list and then
941    // persists these bytes verbatim.
942    let bytes = repo
943        .store()
944        .get_redactions_bytes_for_blob(&blob)
945        .map_err(|err| {
946            ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
947        })?
948        .ok_or_else(|| {
949            ProtocolError::InvalidState(format!(
950                "server wants redaction for blob {} but sender has no sidecar",
951                hex
952            ))
953        })?;
954    Ok(PushMessage {
955        body: Some(push_message::Body::Redaction(RedactionTransfer {
956            blob_hash: hex,
957            redactions_blob: bytes,
958        })),
959    })
960}
961
962fn is_out_of_pack_transfer_object_type(obj_type: ObjectType) -> bool {
963    matches!(
964        obj_type,
965        ObjectType::Redaction | ObjectType::StateVisibility
966    )
967}
968
969fn native_pack_required_for_pull(want_full_closure: bool, wanted_types: &WantedTypes) -> bool {
970    want_full_closure
971        || wanted_types
972            .values()
973            .flatten()
974            .copied()
975            .any(wire::is_native_packable_object_type)
976}
977
978fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
979    let types = wanted_types.entry(pack_id).or_default();
980    if !types.contains(&obj_type) {
981        types.push(obj_type);
982    }
983}
984
985fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
986    wanted_types.get(pack_id).and_then(|types| {
987        types
988            .iter()
989            .copied()
990            .find(|obj_type| wire::is_native_packable_object_type(*obj_type))
991    })
992}
993
994fn sidecar_push_message(
995    repo: &Repository,
996    info: wire::ObjectInfo,
997) -> Result<PushMessage, ProtocolError> {
998    match info.obj_type {
999        ObjectType::Redaction => redaction_push_message(repo, info),
1000        ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1001        obj_type => Err(ProtocolError::InvalidState(format!(
1002            "{obj_type:?} is not an out-of-pack sidecar object"
1003        ))),
1004    }
1005}
1006
1007fn state_visibility_push_message(
1008    repo: &Repository,
1009    info: wire::ObjectInfo,
1010) -> Result<PushMessage, ProtocolError> {
1011    let wire::ObjectId::ChangeId(state) = info.id else {
1012        return Err(ProtocolError::InvalidState(
1013            "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1014        ));
1015    };
1016    let state_id = state.to_string_full();
1017    let bytes = repo
1018        .get_state_visibility_bytes_for_state(&state)
1019        .map_err(|err| {
1020            ProtocolError::InvalidState(format!(
1021                "load state-visibility sidecar for {}: {err}",
1022                state_id
1023            ))
1024        })?
1025        .ok_or_else(|| {
1026            ProtocolError::InvalidState(format!(
1027                "server wants state visibility for state {} but sender has no sidecar",
1028                state_id
1029            ))
1030        })?;
1031    Ok(PushMessage {
1032        body: Some(push_message::Body::StateVisibility(
1033            StateVisibilityTransfer {
1034                state_id,
1035                state_visibility_blob: bytes,
1036            },
1037        )),
1038    })
1039}
1040
1041fn load_thread_metadata(
1042    repo: &Repository,
1043    target_thread: &str,
1044    local_state: ChangeId,
1045) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1046    let thread_manager = ThreadManager::new(repo.heddle_dir());
1047    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1048}
1049
1050fn plan_pull_wants(
1051    repo: &Repository,
1052    remote_state: &ChangeId,
1053    full_closure_available: bool,
1054    objects_to_fetch: Vec<ObjectDescriptor>,
1055    allow_partial_fetch: bool,
1056) -> Result<PullWantPlan, ProtocolError> {
1057    if full_closure_available {
1058        return Ok(PullWantPlan {
1059            wants: Vec::new(),
1060            wanted_types: HashMap::new(),
1061            want_full_closure: true,
1062        });
1063    }
1064    let request_full_closure =
1065        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1066    let mut wants = Vec::with_capacity(objects_to_fetch.len());
1067    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1068
1069    for descriptor in objects_to_fetch {
1070        let info = parse_descriptor_to_info(descriptor)?;
1071        let pack_id = match &info.id {
1072            wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1073            wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1074        };
1075        let include = if request_full_closure {
1076            true
1077        } else {
1078            let has = wire::has_object(repo.store(), &info)?;
1079            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1080        };
1081
1082        if include {
1083            record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1084            wants.push(object_descriptor_with_status(
1085                &info,
1086                ObjectAvailabilityStatus::Missing,
1087                "requested by client",
1088            ));
1089        }
1090    }
1091
1092    Ok(PullWantPlan {
1093        wants,
1094        wanted_types,
1095        want_full_closure: false,
1096    })
1097}
1098
1099fn supports_compact_full_pull(
1100    repo: &Repository,
1101    allow_partial_fetch: bool,
1102    exclude_states: &[ChangeId],
1103) -> Result<bool, ProtocolError> {
1104    if allow_partial_fetch || !exclude_states.is_empty() {
1105        return Ok(false);
1106    }
1107    repo_looks_fresh(repo)
1108}
1109
1110fn should_request_full_closure(
1111    repo: &Repository,
1112    remote_state: &ChangeId,
1113    allow_partial_fetch: bool,
1114) -> Result<bool, ProtocolError> {
1115    if allow_partial_fetch || repo.store().has_state(remote_state)? {
1116        return Ok(false);
1117    }
1118    repo_looks_fresh(repo)
1119}
1120
1121fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1122    if repo.head()?.is_some() {
1123        return Ok(false);
1124    }
1125    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1126        return Ok(false);
1127    }
1128    Ok(repo.missing_blobs()?.is_empty())
1129}
1130
1131fn infer_installed_hash_object_type(
1132    repo: &Repository,
1133    hash: &ContentHash,
1134) -> Result<ObjectType, ProtocolError> {
1135    let store = repo.store();
1136    if store.get_tree(hash)?.is_some() {
1137        return Ok(ObjectType::Tree);
1138    }
1139    if store
1140        .get_action(&objects::object::ActionId::from_hash(*hash))?
1141        .is_some()
1142    {
1143        return Ok(ObjectType::Action);
1144    }
1145    Ok(ObjectType::Blob)
1146}
1147
1148fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1149    const HEADER: &str = "heddle-markers-v1\n";
1150    if checkpoint.is_empty() {
1151        return Ok(false);
1152    }
1153    let payload = std::str::from_utf8(checkpoint)
1154        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1155    let Some(lines) = payload.strip_prefix(HEADER) else {
1156        return Ok(false);
1157    };
1158
1159    for line in lines.lines() {
1160        if line.is_empty() {
1161            continue;
1162        }
1163        let Some((name, change_id)) = line.split_once('\t') else {
1164            return Err(ProtocolError::InvalidState(
1165                "invalid marker snapshot line".to_string(),
1166            ));
1167        };
1168        let change_id = ChangeId::parse(change_id)
1169            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1170        if !repo.store().has_state(&change_id)? {
1171            continue;
1172        }
1173        let name = MarkerName::from(name);
1174        match repo.refs().get_marker(&name)? {
1175            Some(existing) if existing == change_id => {}
1176            Some(existing) => repo.refs().set_marker_cas(
1177                &name,
1178                refs::RefExpectation::Value(existing),
1179                &change_id,
1180            )?,
1181            None => repo.refs().create_marker(&name, &change_id)?,
1182        }
1183    }
1184
1185    Ok(true)
1186}
1187
1188fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1189    if s.is_empty() {
1190        return Vec::new();
1191    }
1192    objects::object::ChangeId::parse(s)
1193        .map(|id| id.as_bytes().to_vec())
1194        .unwrap_or_default()
1195}
1196
1197fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1198    ThreadMetadata {
1199        name: metadata.thread.clone(),
1200        target_thread: metadata.target_thread.clone(),
1201        parent_thread: metadata.parent_thread.clone(),
1202        task: metadata.task.clone(),
1203        thread_mode: metadata.mode.to_string(),
1204        thread_state: metadata.state.to_string(),
1205        freshness: metadata.freshness.to_string(),
1206        base_state: change_id_string_to_bytes(&metadata.base_state),
1207        base_root: change_id_string_to_bytes(&metadata.base_root),
1208        current_state: metadata
1209            .current_state
1210            .as_deref()
1211            .map(change_id_string_to_bytes),
1212        merged_state: metadata
1213            .merged_state
1214            .as_deref()
1215            .map(change_id_string_to_bytes),
1216        changed_paths: metadata.changed_paths.clone(),
1217        impact_categories: metadata
1218            .impact_categories
1219            .iter()
1220            .map(ToString::to_string)
1221            .collect(),
1222        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1223        promotion_suggested: metadata.promotion_suggested,
1224        verification_summary: Some(ThreadVerificationSummary {
1225            tests_passed: metadata.verification_summary.tests_passed,
1226            tests_failed: metadata
1227                .verification_summary
1228                .tests_failed
1229                .unwrap_or_default(),
1230            coverage_pct: metadata.verification_summary.coverage_pct,
1231            lint_warnings: metadata.verification_summary.lint_warnings,
1232        }),
1233        confidence_summary: Some(ThreadConfidenceSummary {
1234            value: metadata.confidence_summary.value,
1235            band: metadata
1236                .confidence_summary
1237                .band
1238                .as_ref()
1239                .map(ToString::to_string),
1240        }),
1241        integration_policy_result: Some(ThreadIntegrationPolicy {
1242            status: metadata
1243                .integration_policy_result
1244                .status
1245                .clone()
1246                .unwrap_or_default(),
1247            reason: metadata
1248                .integration_policy_result
1249                .reason
1250                .clone()
1251                .unwrap_or_default(),
1252        }),
1253        created_at: Some(prost_types::Timestamp {
1254            seconds: metadata.created_at.timestamp(),
1255            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1256        }),
1257        updated_at: Some(prost_types::Timestamp {
1258            seconds: metadata.updated_at.timestamp(),
1259            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1260        }),
1261    }
1262}
1263
1264struct PullExchange {
1265    result: PullComplete,
1266    object_count: usize,
1267    profile: PullProfile,
1268}
1269
1270fn mark_missing_blobs_for_state(
1271    repo: &Repository,
1272    state_id: ChangeId,
1273) -> Result<(), ProtocolError> {
1274    let state = repo
1275        .store()
1276        .get_state(&state_id)?
1277        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1278    let mut missing = collect_missing_blobs(repo, &state.tree)?;
1279    if let Some(context_root) = state.context.as_ref() {
1280        missing.extend(collect_missing_blobs(repo, context_root)?);
1281    }
1282    missing
1283        .into_iter()
1284        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1285}
1286
1287fn clear_missing_blobs_for_state(
1288    repo: &Repository,
1289    state_id: ChangeId,
1290) -> Result<(), ProtocolError> {
1291    let state = repo
1292        .store()
1293        .get_state(&state_id)?
1294        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1295    let mut missing = collect_missing_blobs(repo, &state.tree)?;
1296    if let Some(context_root) = state.context.as_ref() {
1297        missing.extend(collect_missing_blobs(repo, context_root)?);
1298    }
1299    missing
1300        .into_iter()
1301        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1302}
1303
1304fn collect_missing_blobs(
1305    repo: &Repository,
1306    tree_hash: &ContentHash,
1307) -> Result<Vec<ContentHash>, ProtocolError> {
1308    let mut missing = Vec::new();
1309    collect_missing_blobs_recursive(repo, tree_hash, &mut missing)?;
1310    Ok(missing)
1311}
1312
1313fn collect_missing_blobs_recursive(
1314    repo: &Repository,
1315    tree_hash: &ContentHash,
1316    missing: &mut Vec<ContentHash>,
1317) -> Result<(), ProtocolError> {
1318    let Some(tree) = repo.store().get_tree(tree_hash).map_err(|err| {
1319        ProtocolError::InvalidState(format!(
1320            "load tree {} while collecting lazy hydration missing blobs: {err}",
1321            tree_hash.to_hex()
1322        ))
1323    })?
1324    else {
1325        return Ok(());
1326    };
1327    for entry in tree.entries() {
1328        match entry.entry_type {
1329            objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1330                if !repo.store().has_blob(&entry.hash).map_err(|err| {
1331                    ProtocolError::InvalidState(format!(
1332                        "check blob {} while collecting lazy hydration missing blobs: {err}",
1333                        entry.hash.to_hex()
1334                    ))
1335                })? {
1336                    missing.push(entry.hash);
1337                }
1338            }
1339            objects::object::EntryType::Tree => {
1340                collect_missing_blobs_recursive(repo, &entry.hash, missing)?;
1341            }
1342        }
1343    }
1344    Ok(())
1345}
1346
1347fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1348    match repo.missing_blobs() {
1349        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1350        Ok(_) => PartialFetchStatus::Disabled as i32,
1351        Err(_) => PartialFetchStatus::Unspecified as i32,
1352    }
1353}
1354
1355fn pull_transfer_id(
1356    repo_path: &str,
1357    remote_thread: &str,
1358    local_thread: Option<&str>,
1359    depth: Option<u32>,
1360    target_state: Option<ChangeId>,
1361) -> String {
1362    format!(
1363        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1364        local_thread.unwrap_or_default(),
1365        target_state
1366            .map(|value| value.to_string_full())
1367            .unwrap_or_default()
1368    )
1369}
1370
1371fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1372    format!(
1373        "push:{repo_path}:{}:{target_thread}",
1374        local_state.to_string_full()
1375    )
1376}
1377
1378fn encode_native_pack_messages(
1379    bundle: &wire::NativePackBundle,
1380    transfer_id: &str,
1381    chunk_size: usize,
1382    transport: &super::helpers::HostedTransportPolicy,
1383    transport_mode: TransportMode,
1384) -> Result<Vec<PushMessage>, ProtocolError> {
1385    let mut messages = Vec::new();
1386    let chunk_size = chunk_size.max(1);
1387
1388    let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
1389    for chunk_index in 0..pack_total_chunks.max(1) {
1390        let Some((start, len)) =
1391            wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1392        else {
1393            break;
1394        };
1395        messages.push(PushMessage {
1396            body: Some(push_message::Body::Pack(PackChunk {
1397                stream_kind: PackStreamKind::Pack as i32,
1398                data: bundle.pack_data[start..start + len].to_vec(),
1399                transfer: Some(transport.transfer_checkpoint_with_mode(
1400                    transfer_id,
1401                    transport_mode,
1402                    chunk_index as u32,
1403                    start as u64,
1404                    chunk_index + 1 == pack_total_chunks,
1405                )),
1406                chunk_length: len as u32,
1407                is_final_chunk: chunk_index + 1 == pack_total_chunks,
1408            })),
1409        });
1410    }
1411
1412    let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
1413    for chunk_index in 0..index_total_chunks.max(1) {
1414        let Some((start, len)) =
1415            wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1416        else {
1417            break;
1418        };
1419        messages.push(PushMessage {
1420            body: Some(push_message::Body::Pack(PackChunk {
1421                stream_kind: PackStreamKind::Index as i32,
1422                data: bundle.index_data[start..start + len].to_vec(),
1423                transfer: Some(transport.transfer_checkpoint_with_mode(
1424                    transfer_id,
1425                    transport_mode,
1426                    chunk_index as u32,
1427                    start as u64,
1428                    chunk_index + 1 == index_total_chunks,
1429                )),
1430                chunk_length: len as u32,
1431                is_final_chunk: chunk_index + 1 == index_total_chunks,
1432            })),
1433        });
1434    }
1435    Ok(messages)
1436}
1437
1438fn preferred_transport_mode(
1439    transport: &super::helpers::HostedTransportPolicy,
1440    object_count: usize,
1441) -> TransportMode {
1442    let _ = transport;
1443    let _ = object_count;
1444    TransportMode::NativePack
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449    use chrono::{TimeZone, Utc};
1450    use cli_shared::ClientConfig;
1451    use grpc::heddle::v1::{
1452        ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
1453        TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
1454        repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
1455    };
1456    use objects::{
1457        object::{
1458            Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
1459            StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
1460        },
1461        store::ObjectStore,
1462    };
1463    use wire::{ObjectId, ObjectInfo};
1464    use tempfile::TempDir;
1465    use tonic::{Response, Status, transport::Server};
1466
1467    use super::*;
1468
1469    fn temp_repo() -> (TempDir, Repository) {
1470        let dir = TempDir::new().expect("tempdir");
1471        let repo = Repository::init_default(dir.path()).expect("init repo");
1472        (dir, repo)
1473    }
1474
1475    fn redaction_info(blob: ContentHash) -> ObjectInfo {
1476        ObjectInfo {
1477            id: ObjectId::Hash(blob),
1478            obj_type: ObjectType::Redaction,
1479            size: 0,
1480            delta_base: None,
1481        }
1482    }
1483
1484    fn state_info(state: ChangeId) -> ObjectInfo {
1485        ObjectInfo {
1486            id: ObjectId::ChangeId(state),
1487            obj_type: ObjectType::State,
1488            size: 0,
1489            delta_base: None,
1490        }
1491    }
1492
1493    fn state_visibility_info(state: ChangeId) -> ObjectInfo {
1494        ObjectInfo {
1495            id: ObjectId::ChangeId(state),
1496            obj_type: ObjectType::StateVisibility,
1497            size: 0,
1498            delta_base: None,
1499        }
1500    }
1501
1502    fn sample_blob() -> ContentHash {
1503        ContentHash::from_bytes([7u8; 32])
1504    }
1505
1506    #[test]
1507    fn descriptor_id_from_info_matches_proto_encode_path() {
1508        let infos = [
1509            redaction_info(sample_blob()),
1510            state_info(ChangeId::from_bytes([3u8; 16])),
1511            state_visibility_info(ChangeId::from_bytes([9u8; 16])),
1512        ];
1513        for info in infos {
1514            assert_eq!(
1515                descriptor_id_from_info(&info),
1516                descriptor_id(&to_proto_object_info(&info)),
1517                "keying path must stay byte-identical to the throwaway-encode path",
1518            );
1519        }
1520    }
1521
1522    fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
1523        let hex = hash.to_hex();
1524        let (prefix, rest) = hex.split_at(2);
1525        repo.heddle_dir()
1526            .join("objects")
1527            .join("trees")
1528            .join(prefix)
1529            .join(rest)
1530    }
1531
1532    fn sample_redaction(blob: ContentHash) -> Redaction {
1533        Redaction {
1534            redacted_blob: blob,
1535            state: ChangeId::from_bytes([1u8; 16]),
1536            path: "config/secrets.toml".into(),
1537            reason: "leaked credential".into(),
1538            redactor: Principal {
1539                name: "Grace Hopper".into(),
1540                email: "grace@example.com".into(),
1541            },
1542            redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
1543            signature: None,
1544            purged_at: None,
1545            supersedes: None,
1546        }
1547    }
1548
1549    fn sample_state_visibility(state: ChangeId) -> StateVisibility {
1550        StateVisibility {
1551            state,
1552            tier: VisibilityTier::Restricted {
1553                scope_label: "security-embargo".into(),
1554            },
1555            embargo_until: None,
1556            declarer: Principal {
1557                name: "Grace Hopper".into(),
1558                email: "grace@example.com".into(),
1559            },
1560            declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
1561            signature: None,
1562            supersedes: None,
1563        }
1564    }
1565
1566    #[test]
1567    fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
1568        for obj_type in wire::native_pack_excluded_object_types() {
1569            assert!(
1570                is_out_of_pack_transfer_object_type(*obj_type),
1571                "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
1572            );
1573        }
1574    }
1575
1576    #[test]
1577    fn native_pack_required_tracks_packable_pull_wants() {
1578        let blob = sample_blob();
1579        let state = ChangeId::from_bytes([9u8; 16]);
1580
1581        let sidecar_only = HashMap::from([(
1582            PackObjectId::ChangeId(state),
1583            vec![ObjectType::StateVisibility],
1584        )]);
1585        assert!(!native_pack_required_for_pull(false, &sidecar_only));
1586
1587        let redaction_only =
1588            HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Redaction])]);
1589        assert!(!native_pack_required_for_pull(false, &redaction_only));
1590
1591        let packable = HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Blob])]);
1592        assert!(native_pack_required_for_pull(false, &packable));
1593
1594        let state_with_sidecar = HashMap::from([(
1595            PackObjectId::ChangeId(state),
1596            vec![ObjectType::State, ObjectType::StateVisibility],
1597        )]);
1598        assert!(native_pack_required_for_pull(false, &state_with_sidecar));
1599        assert!(native_pack_required_for_pull(true, &HashMap::new()));
1600    }
1601
1602    #[test]
1603    fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
1604        let (_dir, repo) = temp_repo();
1605        let state = ChangeId::from_bytes([9u8; 16]);
1606        let plan = plan_pull_wants(
1607            &repo,
1608            &state,
1609            false,
1610            vec![
1611                object_descriptor_with_status(
1612                    &state_info(state),
1613                    ObjectAvailabilityStatus::Missing,
1614                    "missing state",
1615                ),
1616                object_descriptor_with_status(
1617                    &state_visibility_info(state),
1618                    ObjectAvailabilityStatus::Missing,
1619                    "missing state visibility",
1620                ),
1621            ],
1622            false,
1623        )
1624        .expect("plan pull wants");
1625
1626        let wanted = plan
1627            .wanted_types
1628            .get(&PackObjectId::ChangeId(state))
1629            .expect("same ChangeId want entry");
1630        assert_eq!(
1631            wanted.as_slice(),
1632            &[ObjectType::State, ObjectType::StateVisibility]
1633        );
1634        assert!(native_pack_required_for_pull(
1635            plan.want_full_closure,
1636            &plan.wanted_types
1637        ));
1638    }
1639
1640    #[derive(Clone)]
1641    struct SidecarOnlyPullService {
1642        state: ChangeId,
1643        state_visibility_blob: Vec<u8>,
1644    }
1645
1646    #[tonic::async_trait]
1647    impl RepoSyncService for SidecarOnlyPullService {
1648        async fn list_refs(
1649            &self,
1650            _request: tonic::Request<ListRefsRequest>,
1651        ) -> Result<Response<ListRefsResponse>, Status> {
1652            Ok(Response::new(ListRefsResponse::default()))
1653        }
1654
1655        async fn update_ref(
1656            &self,
1657            _request: tonic::Request<UpdateRefRequest>,
1658        ) -> Result<Response<UpdateRefResponse>, Status> {
1659            Ok(Response::new(UpdateRefResponse::default()))
1660        }
1661
1662        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
1663
1664        async fn push(
1665            &self,
1666            _request: tonic::Request<tonic::Streaming<PushMessage>>,
1667        ) -> Result<Response<Self::PushStream>, Status> {
1668            let (_tx, rx) = mpsc::channel(1);
1669            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
1670                rx,
1671            )))
1672        }
1673
1674        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
1675
1676        async fn pull(
1677            &self,
1678            request: tonic::Request<tonic::Streaming<PullMessage>>,
1679        ) -> Result<Response<Self::PullStream>, Status> {
1680            let state = self.state;
1681            let state_visibility_blob = self.state_visibility_blob.clone();
1682            let (tx, rx) = mpsc::channel(4);
1683
1684            tokio::spawn(async move {
1685                let mut inbound = request.into_inner();
1686                match inbound.message().await {
1687                    Ok(Some(PullMessage {
1688                        body: Some(pull_message::Body::Request(_)),
1689                    })) => {}
1690                    other => {
1691                        let _ = tx
1692                            .send(Err(Status::invalid_argument(format!(
1693                                "expected pull request, got {other:?}"
1694                            ))))
1695                            .await;
1696                        return;
1697                    }
1698                }
1699
1700                let descriptor = object_descriptor_with_status(
1701                    &state_visibility_info(state),
1702                    ObjectAvailabilityStatus::Missing,
1703                    "missing state visibility",
1704                );
1705                let ready = PullMessage {
1706                    body: Some(pull_message::Body::Ready(PullReady {
1707                        remote_state: state.to_string_full(),
1708                        objects_to_fetch: vec![descriptor],
1709                        transfer: None,
1710                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
1711                        missing_objects: Vec::new(),
1712                        full_closure_available: false,
1713                        object_count: 1,
1714                    })),
1715                };
1716                if tx.send(Ok(ready)).await.is_err() {
1717                    return;
1718                }
1719
1720                match inbound.message().await {
1721                    Ok(Some(PullMessage {
1722                        body: Some(pull_message::Body::Want(want)),
1723                    })) if !want.want_full_closure
1724                        && want.objects.len() == 1
1725                        && want.objects[0].object_type == "state_visibility" => {}
1726                    other => {
1727                        let _ = tx
1728                            .send(Err(Status::invalid_argument(format!(
1729                                "expected sidecar-only want, got {other:?}"
1730                            ))))
1731                            .await;
1732                        return;
1733                    }
1734                }
1735
1736                let transfer = PullMessage {
1737                    body: Some(pull_message::Body::StateVisibility(
1738                        StateVisibilityTransfer {
1739                            state_id: state.to_string_full(),
1740                            state_visibility_blob,
1741                        },
1742                    )),
1743                };
1744                if tx.send(Ok(transfer)).await.is_err() {
1745                    return;
1746                }
1747
1748                let complete = PullMessage {
1749                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
1750                        success: true,
1751                        new_state: state.to_string_full(),
1752                        error: String::new(),
1753                        transfer: Some(TransferCheckpoint {
1754                            transfer_id: "sidecar-only-test".to_string(),
1755                            transport_mode: TransportMode::NativePack as i32,
1756                            resume_offset: 0,
1757                            chunk_index: 0,
1758                            checkpoint: b"heddle-markers-v1\n".to_vec(),
1759                            is_complete: true,
1760                        }),
1761                    })),
1762                };
1763                let _ = tx.send(Ok(complete)).await;
1764            });
1765
1766            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
1767                rx,
1768            )))
1769        }
1770    }
1771
1772    async fn connect_sidecar_only_service(
1773        service: SidecarOnlyPullService,
1774    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
1775        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
1776            Ok(listener) => listener,
1777            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
1778                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
1779                return None;
1780            }
1781            Err(err) => panic!("bind test server: {err}"),
1782        };
1783        let addr = listener.local_addr().expect("local addr");
1784        let incoming = futures::stream::unfold(listener, |listener| async {
1785            match listener.accept().await {
1786                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
1787                Err(err) => Some((Err(err), listener)),
1788            }
1789        });
1790
1791        let handle = tokio::spawn(async move {
1792            Server::builder()
1793                .add_service(RepoSyncServiceServer::new(service))
1794                .serve_with_incoming(incoming)
1795                .await
1796                .expect("serve sidecar-only test service");
1797        });
1798
1799        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
1800            .await
1801            .expect("connect client");
1802        Some((client, handle))
1803    }
1804
1805    #[tokio::test]
1806    async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
1807        let (_dir, repo) = temp_repo();
1808        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1809        let state = State::new_snapshot(
1810            tree_hash,
1811            vec![],
1812            Attribution::human(Principal {
1813                name: "Grace Hopper".into(),
1814                email: "grace@example.com".into(),
1815            }),
1816        );
1817        let state_id = state.change_id;
1818        repo.store().put_state(&state).expect("put state");
1819        assert!(
1820            repo.get_state_visibility_bytes_for_state(&state_id)
1821                .expect("load local sidecar")
1822                .is_none(),
1823            "test starts with state present and StateVisibility sidecar absent"
1824        );
1825
1826        let state_visibility_blob =
1827            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
1828                .encode()
1829                .expect("encode state visibility blob");
1830        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1831            state: state_id,
1832            state_visibility_blob,
1833        })
1834        .await
1835        else {
1836            return;
1837        };
1838
1839        let exchange = tokio::time::timeout(
1840            Duration::from_secs(5),
1841            client.pull_exchange(
1842                &repo,
1843                "owner/repo",
1844                "main",
1845                PullOptions {
1846                    local_thread: None,
1847                    depth: None,
1848                    target_state: Some(state_id),
1849                    materialization: PullMaterialization::Full,
1850                },
1851            ),
1852        )
1853        .await
1854        .expect("sidecar-only pull must not hang waiting for native pack")
1855        .expect("sidecar-only pull succeeds");
1856        server.abort();
1857
1858        assert!(exchange.result.success);
1859        assert_eq!(exchange.object_count, 0);
1860        assert_eq!(exchange.profile.pack_bytes_received, 0);
1861        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
1862        assert!(
1863            repo.get_state_visibility_for_state(&state_id)
1864                .expect("load accepted sidecar")
1865                .has_record(),
1866            "pull must accept the out-of-pack StateVisibility sidecar"
1867        );
1868    }
1869
1870    // A sidecar blob larger than tonic's 4 MiB default decode limit but well
1871    // under the 64 MiB receive cap must still decode + install: the raised
1872    // `max_decoding_message_size` is the bound, and it isn't set too tight.
1873    #[tokio::test]
1874    async fn legitimate_large_sidecar_blob_decodes_and_installs() {
1875        let (_dir, repo) = temp_repo();
1876        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1877        let state = State::new_snapshot(
1878            tree_hash,
1879            vec![],
1880            Attribution::human(Principal {
1881                name: "Grace Hopper".into(),
1882                email: "grace@example.com".into(),
1883            }),
1884        );
1885        let state_id = state.change_id;
1886        repo.store().put_state(&state).expect("put state");
1887
1888        // ~8 MiB blob: above tonic's 4 MiB default (which would reject at
1889        // decode without the raised limit), below the 64 MiB sidecar cap.
1890        let mut record = sample_state_visibility(state_id);
1891        record.tier = VisibilityTier::Restricted {
1892            scope_label: "x".repeat(8 * 1024 * 1024),
1893        };
1894        let state_visibility_blob = StateVisibilityBlob::new(vec![record])
1895            .encode()
1896            .expect("encode large state visibility blob");
1897        assert!(
1898            state_visibility_blob.len() > 4 * 1024 * 1024,
1899            "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
1900        );
1901        assert!(
1902            (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
1903            "blob must stay within the legitimate sidecar receive cap"
1904        );
1905
1906        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1907            state: state_id,
1908            state_visibility_blob,
1909        })
1910        .await
1911        else {
1912            return;
1913        };
1914
1915        let exchange = tokio::time::timeout(
1916            Duration::from_secs(30),
1917            client.pull_exchange(
1918                &repo,
1919                "owner/repo",
1920                "main",
1921                PullOptions {
1922                    local_thread: None,
1923                    depth: None,
1924                    target_state: Some(state_id),
1925                    materialization: PullMaterialization::Full,
1926                },
1927            ),
1928        )
1929        .await
1930        .expect("large-sidecar pull must not hang")
1931        .expect("large but legitimate sidecar pull succeeds");
1932        server.abort();
1933
1934        assert!(exchange.result.success);
1935        assert!(
1936            repo.get_state_visibility_for_state(&state_id)
1937                .expect("load accepted sidecar")
1938                .has_record(),
1939            "pull must accept a legitimately-large StateVisibility sidecar"
1940        );
1941    }
1942
1943    // A sidecar blob beyond the pull-stream decode limit must be rejected at
1944    // the gRPC decode boundary — before its `Vec<u8>` is materialized — not by
1945    // the cheaper post-decode `check_received_transfer_blob_size` guard.
1946    #[tokio::test]
1947    async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
1948        let (_dir, repo) = temp_repo();
1949        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1950        let state = State::new_snapshot(
1951            tree_hash,
1952            vec![],
1953            Attribution::human(Principal {
1954                name: "Grace Hopper".into(),
1955                email: "grace@example.com".into(),
1956            }),
1957        );
1958        let state_id = state.change_id;
1959        repo.store().put_state(&state).expect("put state");
1960
1961        // One byte past the decode limit. Content is irrelevant: decode is
1962        // refused before the blob is ever handed to the accept path.
1963        let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
1964        let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1965            state: state_id,
1966            state_visibility_blob: oversized,
1967        })
1968        .await
1969        else {
1970            return;
1971        };
1972
1973        let result = tokio::time::timeout(
1974            Duration::from_secs(30),
1975            client.pull_exchange(
1976                &repo,
1977                "owner/repo",
1978                "main",
1979                PullOptions {
1980                    local_thread: None,
1981                    depth: None,
1982                    target_state: Some(state_id),
1983                    materialization: PullMaterialization::Full,
1984                },
1985            ),
1986        )
1987        .await
1988        .expect("oversized-sidecar pull must not hang");
1989        server.abort();
1990
1991        let err = match result {
1992            Err(err) => err,
1993            Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
1994        };
1995        let message = err.to_string();
1996        assert!(
1997            !message.contains("exceeds receive size limit"),
1998            "rejection must come from the decode-size limit, before the post-decode check: {message}"
1999        );
2000        assert!(
2001            repo.get_state_visibility_for_state(&state_id)
2002                .expect("load sidecar")
2003                .latest()
2004                .expect("resolve visibility")
2005                .is_none(),
2006            "an oversized sidecar must never be installed"
2007        );
2008    }
2009
2010    #[derive(Clone)]
2011    struct StateAndVisibilityPullService {
2012        state: ChangeId,
2013        pack_bundle: wire::NativePackBundle,
2014        state_visibility_blob: Vec<u8>,
2015    }
2016
2017    #[tonic::async_trait]
2018    impl RepoSyncService for StateAndVisibilityPullService {
2019        async fn list_refs(
2020            &self,
2021            _request: tonic::Request<ListRefsRequest>,
2022        ) -> Result<Response<ListRefsResponse>, Status> {
2023            Ok(Response::new(ListRefsResponse::default()))
2024        }
2025
2026        async fn update_ref(
2027            &self,
2028            _request: tonic::Request<UpdateRefRequest>,
2029        ) -> Result<Response<UpdateRefResponse>, Status> {
2030            Ok(Response::new(UpdateRefResponse::default()))
2031        }
2032
2033        type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
2034
2035        async fn push(
2036            &self,
2037            _request: tonic::Request<tonic::Streaming<PushMessage>>,
2038        ) -> Result<Response<Self::PushStream>, Status> {
2039            let (_tx, rx) = mpsc::channel(1);
2040            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
2041                rx,
2042            )))
2043        }
2044
2045        type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
2046
2047        async fn pull(
2048            &self,
2049            request: tonic::Request<tonic::Streaming<PullMessage>>,
2050        ) -> Result<Response<Self::PullStream>, Status> {
2051            let state = self.state;
2052            let pack_bundle = self.pack_bundle.clone();
2053            let state_visibility_blob = self.state_visibility_blob.clone();
2054            let (tx, rx) = mpsc::channel(8);
2055
2056            tokio::spawn(async move {
2057                let mut inbound = request.into_inner();
2058                match inbound.message().await {
2059                    Ok(Some(PullMessage {
2060                        body: Some(pull_message::Body::Request(_)),
2061                    })) => {}
2062                    other => {
2063                        let _ = tx
2064                            .send(Err(Status::invalid_argument(format!(
2065                                "expected pull request, got {other:?}"
2066                            ))))
2067                            .await;
2068                        return;
2069                    }
2070                }
2071
2072                let ready = PullMessage {
2073                    body: Some(pull_message::Body::Ready(PullReady {
2074                        remote_state: state.to_string_full(),
2075                        objects_to_fetch: vec![
2076                            object_descriptor_with_status(
2077                                &state_info(state),
2078                                ObjectAvailabilityStatus::Missing,
2079                                "missing state",
2080                            ),
2081                            object_descriptor_with_status(
2082                                &state_visibility_info(state),
2083                                ObjectAvailabilityStatus::Missing,
2084                                "missing state visibility",
2085                            ),
2086                        ],
2087                        transfer: None,
2088                        partial_fetch_status: PartialFetchStatus::Disabled as i32,
2089                        missing_objects: Vec::new(),
2090                        full_closure_available: false,
2091                        object_count: 2,
2092                    })),
2093                };
2094                if tx.send(Ok(ready)).await.is_err() {
2095                    return;
2096                }
2097
2098                match inbound.message().await {
2099                    Ok(Some(PullMessage {
2100                        body: Some(pull_message::Body::Want(want)),
2101                    })) if !want.want_full_closure
2102                        && want.objects.len() == 2
2103                        && want
2104                            .objects
2105                            .iter()
2106                            .any(|object| object.object_type == "state")
2107                        && want
2108                            .objects
2109                            .iter()
2110                            .any(|object| object.object_type == "state_visibility") => {}
2111                    other => {
2112                        let _ = tx
2113                            .send(Err(Status::invalid_argument(format!(
2114                                "expected state + sidecar wants, got {other:?}"
2115                            ))))
2116                            .await;
2117                        return;
2118                    }
2119                }
2120
2121                for message in
2122                    encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
2123                {
2124                    if tx.send(Ok(message)).await.is_err() {
2125                        return;
2126                    }
2127                }
2128
2129                let transfer = PullMessage {
2130                    body: Some(pull_message::Body::StateVisibility(
2131                        StateVisibilityTransfer {
2132                            state_id: state.to_string_full(),
2133                            state_visibility_blob,
2134                        },
2135                    )),
2136                };
2137                if tx.send(Ok(transfer)).await.is_err() {
2138                    return;
2139                }
2140
2141                let complete = PullMessage {
2142                    body: Some(pull_message::Body::Complete(GrpcPullComplete {
2143                        success: true,
2144                        new_state: state.to_string_full(),
2145                        error: String::new(),
2146                        transfer: Some(TransferCheckpoint {
2147                            transfer_id: "state-and-visibility-test".to_string(),
2148                            transport_mode: TransportMode::NativePack as i32,
2149                            resume_offset: 0,
2150                            chunk_index: 0,
2151                            checkpoint: b"heddle-markers-v1\n".to_vec(),
2152                            is_complete: true,
2153                        }),
2154                    })),
2155                };
2156                let _ = tx.send(Ok(complete)).await;
2157            });
2158
2159            Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
2160                rx,
2161            )))
2162        }
2163    }
2164
2165    fn encode_pull_native_pack_messages(
2166        bundle: &wire::NativePackBundle,
2167        transfer_id: &str,
2168        chunk_size: usize,
2169    ) -> Vec<PullMessage> {
2170        let mut messages = Vec::new();
2171        let chunk_size = chunk_size.max(1);
2172
2173        let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
2174        for chunk_index in 0..pack_total_chunks.max(1) {
2175            let Some((start, len)) =
2176                wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
2177            else {
2178                break;
2179            };
2180            messages.push(PullMessage {
2181                body: Some(pull_message::Body::Pack(PackChunk {
2182                    stream_kind: PackStreamKind::Pack as i32,
2183                    data: bundle.pack_data[start..start + len].to_vec(),
2184                    transfer: Some(TransferCheckpoint {
2185                        transfer_id: transfer_id.to_string(),
2186                        transport_mode: TransportMode::NativePack as i32,
2187                        resume_offset: start as u64,
2188                        chunk_index: chunk_index as u32,
2189                        checkpoint: Vec::new(),
2190                        is_complete: chunk_index + 1 == pack_total_chunks,
2191                    }),
2192                    chunk_length: len as u32,
2193                    is_final_chunk: chunk_index + 1 == pack_total_chunks,
2194                })),
2195            });
2196        }
2197
2198        let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
2199        for chunk_index in 0..index_total_chunks.max(1) {
2200            let Some((start, len)) =
2201                wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
2202            else {
2203                break;
2204            };
2205            messages.push(PullMessage {
2206                body: Some(pull_message::Body::Pack(PackChunk {
2207                    stream_kind: PackStreamKind::Index as i32,
2208                    data: bundle.index_data[start..start + len].to_vec(),
2209                    transfer: Some(TransferCheckpoint {
2210                        transfer_id: transfer_id.to_string(),
2211                        transport_mode: TransportMode::NativePack as i32,
2212                        resume_offset: start as u64,
2213                        chunk_index: chunk_index as u32,
2214                        checkpoint: Vec::new(),
2215                        is_complete: chunk_index + 1 == index_total_chunks,
2216                    }),
2217                    chunk_length: len as u32,
2218                    is_final_chunk: chunk_index + 1 == index_total_chunks,
2219                })),
2220            });
2221        }
2222
2223        messages
2224    }
2225
2226    async fn connect_state_and_visibility_service(
2227        service: StateAndVisibilityPullService,
2228    ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
2229        let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
2230            Ok(listener) => listener,
2231            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
2232                eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
2233                return None;
2234            }
2235            Err(err) => panic!("bind test server: {err}"),
2236        };
2237        let addr = listener.local_addr().expect("local addr");
2238        let incoming = futures::stream::unfold(listener, |listener| async {
2239            match listener.accept().await {
2240                Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
2241                Err(err) => Some((Err(err), listener)),
2242            }
2243        });
2244
2245        let handle = tokio::spawn(async move {
2246            Server::builder()
2247                .add_service(RepoSyncServiceServer::new(service))
2248                .serve_with_incoming(incoming)
2249                .await
2250                .expect("serve state-and-visibility test service");
2251        });
2252
2253        let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
2254            .await
2255            .expect("connect client");
2256        Some((client, handle))
2257    }
2258
2259    #[tokio::test]
2260    async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
2261        let (_source_dir, source_repo) = temp_repo();
2262        let (_target_dir, target_repo) = temp_repo();
2263        let tree_hash = source_repo
2264            .store()
2265            .put_tree(&Tree::new())
2266            .expect("put tree");
2267        let state = State::new_snapshot(
2268            tree_hash,
2269            vec![],
2270            Attribution::human(Principal {
2271                name: "Grace Hopper".into(),
2272                email: "grace@example.com".into(),
2273            }),
2274        );
2275        let state_id = state.change_id;
2276        source_repo
2277            .store()
2278            .put_state(&state)
2279            .expect("put source state");
2280        let state_visibility_blob =
2281            StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
2282                .encode()
2283                .expect("encode state visibility blob");
2284        source_repo
2285            .accept_wire_state_visibility(state_id, &state_visibility_blob)
2286            .expect("put source state visibility");
2287        let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
2288            .expect("build state pack");
2289
2290        assert!(
2291            target_repo
2292                .store()
2293                .get_state(&state_id)
2294                .expect("load target state")
2295                .is_none(),
2296            "test starts with state absent"
2297        );
2298        assert!(
2299            target_repo
2300                .get_state_visibility_bytes_for_state(&state_id)
2301                .expect("load target sidecar")
2302                .is_none(),
2303            "test starts with StateVisibility sidecar absent"
2304        );
2305
2306        let Some((mut client, server)) =
2307            connect_state_and_visibility_service(StateAndVisibilityPullService {
2308                state: state_id,
2309                pack_bundle,
2310                state_visibility_blob,
2311            })
2312            .await
2313        else {
2314            return;
2315        };
2316
2317        let exchange = tokio::time::timeout(
2318            Duration::from_secs(5),
2319            client.pull_exchange(
2320                &target_repo,
2321                "owner/repo",
2322                "main",
2323                PullOptions {
2324                    local_thread: None,
2325                    depth: None,
2326                    target_state: Some(state_id),
2327                    materialization: PullMaterialization::Full,
2328                },
2329            ),
2330        )
2331        .await
2332        .expect("state + sidecar pull must not hang waiting for native pack")
2333        .expect("state + sidecar pull succeeds");
2334        server.abort();
2335
2336        assert!(exchange.result.success);
2337        assert_eq!(exchange.object_count, 1);
2338        assert!(exchange.profile.pack_bytes_received > 0);
2339        assert_eq!(exchange.profile.object_mix.states, 1);
2340        assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
2341        assert!(
2342            target_repo
2343                .store()
2344                .get_state(&state_id)
2345                .expect("load installed state")
2346                .is_some(),
2347            "native pack must install the State"
2348        );
2349        assert!(
2350            target_repo
2351                .get_state_visibility_for_state(&state_id)
2352                .expect("load accepted sidecar")
2353                .has_record(),
2354            "pull must accept the out-of-pack StateVisibility sidecar"
2355        );
2356    }
2357
2358    #[test]
2359    fn collect_missing_blobs_treats_absent_tree_as_empty() {
2360        let (_dir, repo) = temp_repo();
2361        let absent_tree = ContentHash::from_bytes([99u8; 32]);
2362
2363        let missing =
2364            collect_missing_blobs(&repo, &absent_tree).expect("absent tree is not an error");
2365
2366        assert!(missing.is_empty());
2367    }
2368
2369    #[test]
2370    fn collect_missing_blobs_reports_only_genuinely_missing_blobs() {
2371        let (_dir, repo) = temp_repo();
2372        let present_blob = Blob::from("already local");
2373        let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
2374        let missing_hash = ContentHash::from_bytes([42u8; 32]);
2375        let tree = Tree::from_entries(vec![
2376            TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
2377            TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
2378        ]);
2379        let tree_hash = repo.store().put_tree(&tree).expect("put tree");
2380
2381        let missing = collect_missing_blobs(&repo, &tree_hash).expect("collect missing blobs");
2382
2383        assert_eq!(missing, vec![missing_hash]);
2384    }
2385
2386    #[test]
2387    fn collect_missing_blobs_reports_corrupt_tree_read() {
2388        let (_dir, repo) = temp_repo();
2389        let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
2390        std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
2391        repo.store().clear_recent_caches();
2392
2393        let err = collect_missing_blobs(&repo, &tree_hash).expect_err("corrupt tree must fail");
2394
2395        assert!(matches!(err, ProtocolError::InvalidState(_)));
2396        assert!(
2397            err.to_string().contains(&format!(
2398                "load tree {} while collecting lazy hydration missing blobs",
2399                tree_hash.to_hex()
2400            )),
2401            "unexpected error: {err}"
2402        );
2403    }
2404
2405    #[test]
2406    fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
2407        let (_dir, repo) = temp_repo();
2408        let blob = sample_blob();
2409        repo.put_redaction(sample_redaction(blob))
2410            .expect("put redaction");
2411        let expected_bytes = repo
2412            .store()
2413            .get_redactions_bytes_for_blob(&blob)
2414            .expect("load sidecar")
2415            .expect("sidecar exists");
2416
2417        let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
2418
2419        let Some(push_message::Body::Redaction(transfer)) = message.body else {
2420            panic!("expected redaction transfer");
2421        };
2422        assert_eq!(transfer.blob_hash, blob.to_hex());
2423        assert_eq!(transfer.redactions_blob, expected_bytes);
2424    }
2425
2426    #[test]
2427    fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
2428        let (_dir, repo) = temp_repo();
2429        let blob = sample_blob();
2430
2431        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
2432
2433        assert!(matches!(err, ProtocolError::InvalidState(_)));
2434        assert!(
2435            err.to_string().contains(&format!(
2436                "server wants redaction for blob {} but sender has no sidecar",
2437                blob.to_hex()
2438            )),
2439            "unexpected error: {err}"
2440        );
2441    }
2442
2443    #[test]
2444    fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
2445        let (_dir, repo) = temp_repo();
2446        let blob = sample_blob();
2447        let redaction_path = repo
2448            .heddle_dir()
2449            .join("redactions")
2450            .join(format!("{}.bin", blob.to_hex()));
2451        std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
2452
2453        let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
2454
2455        assert!(matches!(err, ProtocolError::InvalidState(_)));
2456        assert!(
2457            err.to_string()
2458                .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
2459            "unexpected error: {err}"
2460        );
2461    }
2462
2463    #[test]
2464    fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
2465        let (_dir, repo) = temp_repo();
2466        let state = ChangeId::from_bytes([17u8; 16]);
2467        repo.put_state_visibility(sample_state_visibility(state))
2468            .expect("put state visibility");
2469        let expected_bytes = repo
2470            .get_state_visibility_bytes_for_state(&state)
2471            .expect("load sidecar")
2472            .expect("sidecar exists");
2473
2474        let message =
2475            state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
2476
2477        let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
2478            panic!("expected state visibility transfer");
2479        };
2480        assert_eq!(transfer.state_id, state.to_string_full());
2481        assert_eq!(transfer.state_visibility_blob, expected_bytes);
2482    }
2483}