1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7 GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9 RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
10 ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
11 pull_message, push_message,
12};
13use objects::{
14 object::{ChangeId, ContentHash, MarkerName, ThreadName},
15 store::{ObjectStore, PackObjectId},
16};
17use wire::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
18use repo::{Repository, SyncedThreadMetadata, ThreadManager};
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::Request;
22
23use super::{
24 HostedGrpcClient, PullMaterialization,
25 helpers::{
26 descriptor_id, descriptor_id_from_info, object_descriptor_with_status,
27 parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
28 transport_mode_name,
29 },
30};
31
32#[derive(Clone, Copy)]
33struct PullOptions<'a> {
34 local_thread: Option<&'a str>,
35 depth: Option<u32>,
36 target_state: Option<ChangeId>,
37 materialization: PullMaterialization,
38}
39
40struct PullWantPlan {
41 wants: Vec<ObjectDescriptor>,
42 wanted_types: WantedTypes,
43 want_full_closure: bool,
44}
45
46type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
47
48#[derive(Debug, Clone, Default)]
49pub struct PullObjectMix {
50 pub blobs: usize,
51 pub trees: usize,
52 pub states: usize,
53 pub actions: usize,
54 pub redactions: usize,
55 pub state_visibilities: usize,
56}
57
58impl PullObjectMix {
59 fn record(&mut self, obj_type: ObjectType) {
60 match obj_type {
61 ObjectType::Blob => self.blobs += 1,
62 ObjectType::Tree => self.trees += 1,
63 ObjectType::State => self.states += 1,
64 ObjectType::Action => self.actions += 1,
65 ObjectType::Redaction => self.redactions += 1,
66 ObjectType::StateVisibility => self.state_visibilities += 1,
67 }
68 }
69
70 pub fn total(&self) -> usize {
71 self.blobs
72 + self.trees
73 + self.states
74 + self.actions
75 + self.redactions
76 + self.state_visibilities
77 }
78}
79
80#[derive(Debug, Clone, Default)]
81pub struct PullProfile {
82 pub ready_wait: Duration,
83 pub receive_and_apply: Duration,
84 pub decode: Duration,
85 pub store_receive_object: Duration,
86 pub metadata_sync: Duration,
87 pub pack_decode_apply: Duration,
88 pub raw_decode_apply: Duration,
89 pub pack_decode: Duration,
90 pub raw_decode: Duration,
91 pub bytes_received: usize,
92 pub pack_bytes_received: usize,
93 pub raw_bytes_received: usize,
94 pub objects_received: usize,
95 pub object_mix: PullObjectMix,
96}
97
98impl HostedGrpcClient {
99 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
100 let mut request = Request::new(ListRefsRequest {
101 repo_path: repo_path.to_string(),
102 });
103 self.apply_auth(&mut request)?;
104 let response = self
105 .inner
106 .list_refs(request)
107 .await
108 .map_err(status_to_protocol_error)?
109 .into_inner();
110 response
111 .refs
112 .into_iter()
113 .map(|entry| {
114 Ok(RefEntry {
115 name: entry.name,
116 change_id: ChangeId::try_from_slice(&entry.change_id)
117 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
118 is_thread: entry.is_thread,
119 })
120 })
121 .collect()
122 }
123
124 #[allow(clippy::too_many_arguments)]
125 pub async fn update_ref(
126 &mut self,
127 repo_path: &str,
128 name: &str,
129 is_thread: bool,
130 old_value: Option<ChangeId>,
131 new_value: ChangeId,
132 force: bool,
133 thread_metadata: Option<&SyncedThreadMetadata>,
134 ) -> Result<RefUpdated, ProtocolError> {
135 let mut request = Request::new(UpdateRefRequest {
136 repo_path: repo_path.to_string(),
137 name: name.to_string(),
138 is_thread,
139 force,
140 old_value: old_value
141 .map(|value| value.to_string_full())
142 .unwrap_or_default(),
143 new_value: new_value.to_string_full(),
144 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
145 client_operation_id: String::new(),
146 });
147 self.apply_auth(&mut request)?;
148 let response = self
149 .inner
150 .update_ref(request)
151 .await
152 .map_err(status_to_protocol_error)?
153 .into_inner();
154 Ok(RefUpdated {
155 success: response.success,
156 old_value: if response.old_value.is_empty() {
157 None
158 } else {
159 Some(
160 ChangeId::parse(&response.old_value)
161 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
162 )
163 },
164 error: (!response.error.is_empty()).then_some(response.error),
165 })
166 }
167
168 pub async fn push(
169 &mut self,
170 repo: &Repository,
171 repo_path: &str,
172 local_state: ChangeId,
173 target_thread: &str,
174 force: bool,
175 ) -> Result<PushComplete, ProtocolError> {
176 let _ = self.transport.chunk_size;
177 let _ = self.transport.resume_attempts;
178 let objects = wire::enumerate_state_closure(repo.store(), local_state)?;
179 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
180 let transport_mode = preferred_transport_mode(&self.transport, objects.len());
181 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
182 let request_message = PushMessage {
183 body: Some(push_message::Body::Request(PushRequest {
184 repo_path: repo_path.to_string(),
185 local_state: local_state.to_string_full(),
186 target_thread: target_thread.to_string(),
187 create_thread: true,
188 force,
189 objects: objects.iter().map(to_proto_object_info).collect(),
190 transfer: Some(self.transport.transfer_checkpoint_with_mode(
191 transfer_id.clone(),
192 transport_mode,
193 0,
194 0,
195 false,
196 )),
197 partial_fetch_status: partial_fetch_status_for_repo(repo),
198 allow_partial_fetch: true,
199 thread_metadata: thread_metadata
200 .map(|metadata| to_proto_thread_metadata(&metadata)),
201 client_operation_id: String::new(),
202 })),
203 };
204
205 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
206 tx.send(request_message).await.map_err(|_| {
207 ProtocolError::InvalidState("failed to initialize push stream".to_string())
208 })?;
209 let mut request = Request::new(ReceiverStream::new(rx));
210 self.apply_auth(&mut request)?;
211 let mut response = self
212 .inner
213 .push(request)
214 .await
215 .map_err(status_to_protocol_error)?
216 .into_inner();
217
218 let ready = match response.message().await.map_err(status_to_protocol_error)? {
219 Some(PushMessage {
220 body: Some(push_message::Body::Ready(ready)),
221 }) => ready,
222 _ => {
223 return Err(ProtocolError::InvalidState(
224 "expected PushReady from gRPC server".to_string(),
225 ));
226 }
227 };
228
229 let object_index = objects
230 .into_iter()
231 .map(|info| (descriptor_id_from_info(&info), info))
232 .collect::<HashMap<_, _>>();
233
234 let ready_transport_mode = ready
235 .transfer
236 .as_ref()
237 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
238 .unwrap_or(transport_mode);
239 let wanted_infos = ready
240 .want_objects
241 .into_iter()
242 .map(|want| {
243 object_index
244 .get(&descriptor_id(&want))
245 .cloned()
246 .ok_or_else(|| {
247 ProtocolError::InvalidState("server requested unknown object".to_string())
248 })
249 })
250 .collect::<Result<Vec<_>, _>>()?;
251
252 let (wanted_sidecars, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
257 .into_iter()
258 .partition(|info| is_out_of_pack_transfer_object_type(info.obj_type));
259
260 if !wanted_packable.is_empty() {
261 let bundle = wire::build_native_pack(repo.store(), &wanted_packable)?;
262 for message in encode_native_pack_messages(
263 &bundle,
264 &transfer_id,
265 self.transport.chunk_size.max(1),
266 &self.transport,
267 ready_transport_mode,
268 )? {
269 tx.send(message).await.map_err(|_| {
270 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
271 })?;
272 }
273 }
274
275 for info in wanted_sidecars {
276 let message = sidecar_push_message(repo, info)?;
277 tx.send(message).await.map_err(|_| {
278 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
279 })?;
280 }
281 drop(tx);
282
283 let result = match response.message().await.map_err(status_to_protocol_error)? {
284 Some(PushMessage {
285 body: Some(push_message::Body::Complete(complete)),
286 }) => PushComplete {
287 success: complete.success,
288 new_state: if complete.new_state.is_empty() {
289 None
290 } else {
291 Some(
292 ChangeId::parse(&complete.new_state)
293 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
294 )
295 },
296 error: (!complete.error.is_empty()).then_some(complete.error),
297 transfer_id: complete
298 .transfer
299 .as_ref()
300 .map(|transfer| transfer.transfer_id.clone())
301 .unwrap_or_default(),
302 transport_mode: complete
303 .transfer
304 .as_ref()
305 .map(|transfer| transport_mode_name(transfer.transport_mode))
306 .unwrap_or("raw-objects")
307 .to_string(),
308 resume_offset: complete
309 .transfer
310 .as_ref()
311 .map(|transfer| transfer.resume_offset)
312 .unwrap_or_default(),
313 chunk_index: complete
314 .transfer
315 .as_ref()
316 .map(|transfer| transfer.chunk_index)
317 .unwrap_or_default(),
318 checkpoint: complete
319 .transfer
320 .as_ref()
321 .map(|transfer| transfer.checkpoint.clone())
322 .unwrap_or_default(),
323 is_complete: complete
324 .transfer
325 .as_ref()
326 .map(|transfer| transfer.is_complete)
327 .unwrap_or(false),
328 },
329 _ => {
330 return Err(ProtocolError::InvalidState(
331 "expected PushComplete from gRPC server".to_string(),
332 ));
333 }
334 };
335
336 if result.success {
337 self.sync_remote_markers(repo, repo_path, local_state)
338 .await?;
339 }
340 Ok(result)
341 }
342
343 pub async fn pull(
344 &mut self,
345 repo: &Repository,
346 repo_path: &str,
347 remote_thread: &str,
348 local_thread: Option<&str>,
349 ) -> Result<PullComplete, ProtocolError> {
350 self.pull_with_options(
351 repo,
352 repo_path,
353 remote_thread,
354 PullOptions {
355 local_thread,
356 depth: None,
357 target_state: None,
358 materialization: PullMaterialization::Full,
359 },
360 )
361 .await
362 }
363
364 pub async fn pull_profiled(
365 &mut self,
366 repo: &Repository,
367 repo_path: &str,
368 remote_thread: &str,
369 local_thread: Option<&str>,
370 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
371 self.pull_exchange(
372 repo,
373 repo_path,
374 remote_thread,
375 PullOptions {
376 local_thread,
377 depth: None,
378 target_state: None,
379 materialization: PullMaterialization::Full,
380 },
381 )
382 .await
383 .map(|exchange| (exchange.result, exchange.profile))
384 }
385
386 pub async fn pull_partial(
387 &mut self,
388 repo: &Repository,
389 repo_path: &str,
390 remote_thread: &str,
391 local_thread: Option<&str>,
392 ) -> Result<PullComplete, ProtocolError> {
393 self.pull_with_options(
394 repo,
395 repo_path,
396 remote_thread,
397 PullOptions {
398 local_thread,
399 depth: None,
400 target_state: None,
401 materialization: PullMaterialization::Lazy,
402 },
403 )
404 .await
405 }
406
407 pub async fn pull_with_depth_and_materialization(
408 &mut self,
409 repo: &Repository,
410 repo_path: &str,
411 remote_thread: &str,
412 local_thread: Option<&str>,
413 depth: Option<u32>,
414 materialization: PullMaterialization,
415 ) -> Result<PullComplete, ProtocolError> {
416 self.pull_with_options(
417 repo,
418 repo_path,
419 remote_thread,
420 PullOptions {
421 local_thread,
422 depth,
423 target_state: None,
424 materialization,
425 },
426 )
427 .await
428 }
429
430 pub async fn pull_with_depth(
431 &mut self,
432 repo: &Repository,
433 repo_path: &str,
434 remote_thread: &str,
435 local_thread: Option<&str>,
436 depth: Option<u32>,
437 ) -> Result<PullComplete, ProtocolError> {
438 self.pull_with_depth_and_materialization(
439 repo,
440 repo_path,
441 remote_thread,
442 local_thread,
443 depth,
444 PullMaterialization::Full,
445 )
446 .await
447 }
448
449 pub async fn fetch_state(
450 &mut self,
451 repo: &Repository,
452 repo_path: &str,
453 remote_thread: &str,
454 target_state: ChangeId,
455 ) -> Result<usize, ProtocolError> {
456 self.pull_exchange(
457 repo,
458 repo_path,
459 remote_thread,
460 PullOptions {
461 local_thread: None,
462 depth: None,
463 target_state: Some(target_state),
464 materialization: PullMaterialization::Full,
465 },
466 )
467 .await
468 .map(|exchange| exchange.object_count)
469 }
470
471 pub async fn fetch_state_partial(
472 &mut self,
473 repo: &Repository,
474 repo_path: &str,
475 remote_thread: &str,
476 target_state: ChangeId,
477 ) -> Result<usize, ProtocolError> {
478 self.pull_exchange(
479 repo,
480 repo_path,
481 remote_thread,
482 PullOptions {
483 local_thread: None,
484 depth: None,
485 target_state: Some(target_state),
486 materialization: PullMaterialization::Lazy,
487 },
488 )
489 .await
490 .map(|exchange| exchange.object_count)
491 }
492
493 pub async fn hydrate_blob_at_path(
494 &mut self,
495 repo: &Repository,
496 repo_path: &str,
497 reference: &str,
498 path: &str,
499 ) -> Result<objects::object::Blob, ProtocolError> {
500 let mut request = Request::new(GetBlobRequest {
501 repo_path: repo_path.to_string(),
502 r#ref: reference.to_string(),
503 path: path.to_string(),
504 });
505 self.apply_auth(&mut request)?;
506 let response = self
507 .content
508 .get_blob(request)
509 .await
510 .map_err(status_to_protocol_error)?
511 .into_inner();
512
513 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
514 let blob = objects::object::Blob::new(content);
515 repo.store().put_blob(&blob)?;
516 repo.clear_missing_blob(&blob.hash())?;
517 Ok(blob)
518 }
519
520 pub async fn hydrate_missing_blobs_for_state(
521 &mut self,
522 repo: &Repository,
523 repo_path: &str,
524 remote_thread: &str,
525 target_state: ChangeId,
526 ) -> Result<usize, ProtocolError> {
527 let exchange = self
528 .pull_exchange(
529 repo,
530 repo_path,
531 remote_thread,
532 PullOptions {
533 local_thread: None,
534 depth: None,
535 target_state: Some(target_state),
536 materialization: PullMaterialization::Full,
537 },
538 )
539 .await?;
540 clear_missing_blobs_for_state(repo, target_state)?;
541 Ok(exchange.object_count)
542 }
543
544 async fn pull_with_options(
545 &mut self,
546 repo: &Repository,
547 repo_path: &str,
548 remote_thread: &str,
549 options: PullOptions<'_>,
550 ) -> Result<PullComplete, ProtocolError> {
551 self.pull_exchange(repo, repo_path, remote_thread, options)
552 .await
553 .map(|exchange| exchange.result)
554 }
555
556 async fn pull_exchange(
557 &mut self,
558 repo: &Repository,
559 repo_path: &str,
560 remote_thread: &str,
561 options: PullOptions<'_>,
562 ) -> Result<PullExchange, ProtocolError> {
563 let exchange_start = Instant::now();
564 let mut exclude_states = Vec::new();
565 if let Some(local_thread) = options.local_thread
566 && let Some(head) = repo.refs().get_thread(&ThreadName::from(local_thread))?
567 {
568 exclude_states.push(head);
569 }
570 let allow_partial_fetch = options.materialization.allows_partial_fetch();
571 let fresh_full_pull =
572 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
573
574 let transfer_id = pull_transfer_id(
575 repo_path,
576 remote_thread,
577 options.local_thread,
578 options.depth,
579 options.target_state,
580 );
581 let request_message = PullMessage {
582 body: Some(pull_message::Body::Request(PullRequest {
583 repo_path: repo_path.to_string(),
584 remote_thread: remote_thread.to_string(),
585 local_thread: options.local_thread.unwrap_or_default().to_string(),
586 target_state: options
587 .target_state
588 .map(|value| value.to_string_full())
589 .unwrap_or_default(),
590 depth: options.depth.unwrap_or_default(),
591 exclude_states: exclude_states
592 .iter()
593 .map(ChangeId::to_string_full)
594 .collect(),
595 transfer: Some(self.transport.transfer_checkpoint_with_mode(
596 transfer_id.clone(),
597 TransportMode::NativePack,
598 0,
599 0,
600 false,
601 )),
602 partial_fetch_status: partial_fetch_status_for_repo(repo),
603 allow_partial_fetch,
604 fresh_full_pull,
605 client_operation_id: String::new(),
606 })),
607 };
608
609 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
610 tx.send(request_message).await.map_err(|_| {
611 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
612 })?;
613 let mut request = Request::new(ReceiverStream::new(rx));
614 self.apply_auth(&mut request)?;
615 let mut response = self
616 .inner
617 .pull(request)
618 .await
619 .map_err(status_to_protocol_error)?
620 .into_inner();
621
622 let ready = match response.message().await.map_err(status_to_protocol_error)? {
623 Some(PullMessage {
624 body: Some(pull_message::Body::Ready(ready)),
625 }) => ready,
626 _ => {
627 return Err(ProtocolError::InvalidState(
628 "expected PullReady from gRPC server".to_string(),
629 ));
630 }
631 };
632 let mut profile = PullProfile {
633 ready_wait: exchange_start.elapsed(),
634 ..PullProfile::default()
635 };
636 let remote_state = ChangeId::parse(&ready.remote_state)
637 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
638 let PullWantPlan {
639 wants,
640 wanted_types,
641 want_full_closure,
642 } = plan_pull_wants(
643 repo,
644 &remote_state,
645 ready.full_closure_available,
646 ready.objects_to_fetch,
647 allow_partial_fetch,
648 )?;
649 let native_pack_required = native_pack_required_for_pull(want_full_closure, &wanted_types);
650
651 tx.send(PullMessage {
652 body: Some(pull_message::Body::Want(WantObjects {
653 objects: wants.clone(),
654 want_full_closure,
655 transfer: Some(self.transport.transfer_checkpoint_with_mode(
656 transfer_id.clone(),
657 TransportMode::NativePack,
658 0,
659 0,
660 false,
661 )),
662 })),
663 })
664 .await
665 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
666 drop(tx);
667
668 let receive_start = Instant::now();
669 let mut pack_state = wire::PackChunkState::default();
670 let mut received = 0usize;
671 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
672 match message.body {
673 Some(pull_message::Body::Pack(chunk)) => {
674 profile.bytes_received =
675 profile.bytes_received.saturating_add(chunk.data.len());
676 profile.pack_bytes_received =
677 profile.pack_bytes_received.saturating_add(chunk.data.len());
678 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
679 ProtocolError::InvalidState(
680 "native pack chunk missing transfer checkpoint".to_string(),
681 )
682 })?;
683 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
684 .unwrap_or(PackStreamKind::Unspecified);
685 if stream_kind == PackStreamKind::Unspecified {
686 return Err(ProtocolError::InvalidState(
687 "native pack chunk missing stream kind".to_string(),
688 ));
689 }
690 let decode_start = Instant::now();
691 wire::receive_pack_chunk(
692 &mut pack_state,
693 stream_kind == PackStreamKind::Index,
694 transfer.resume_offset,
695 transfer.chunk_index,
696 transfer.is_complete,
697 &chunk.data,
698 chunk.is_final_chunk,
699 )?;
700 let decode_elapsed = decode_start.elapsed();
701 profile.pack_decode += decode_elapsed;
702 profile.pack_decode_apply += decode_elapsed;
703 profile.decode += decode_elapsed;
704 }
705 Some(pull_message::Body::Redaction(transfer)) => {
706 wire::check_received_transfer_blob_size(
712 transfer.redactions_blob.len(),
713 wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
714 "redactions",
715 )?;
716 profile.bytes_received = profile
717 .bytes_received
718 .saturating_add(transfer.redactions_blob.len());
719 profile.object_mix.record(ObjectType::Redaction);
720 let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
721 ProtocolError::InvalidState(format!(
722 "RedactionTransfer.blob_hash is not a valid content hash: {err}"
723 ))
724 })?;
725 let decode_start = Instant::now();
726 repo.accept_wire_redactions(blob, &transfer.redactions_blob)
727 .map_err(|err| {
728 ProtocolError::InvalidState(format!(
729 "accept_wire_redactions for blob {}: {err}",
730 transfer.blob_hash
731 ))
732 })?;
733 let decode_elapsed = decode_start.elapsed();
734 profile.store_receive_object += decode_elapsed;
735 }
736 Some(pull_message::Body::StateVisibility(transfer)) => {
737 wire::check_received_transfer_blob_size(
738 transfer.state_visibility_blob.len(),
739 wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
740 "state-visibility",
741 )?;
742 profile.bytes_received = profile
743 .bytes_received
744 .saturating_add(transfer.state_visibility_blob.len());
745 profile.object_mix.record(ObjectType::StateVisibility);
746 let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
747 ProtocolError::InvalidState(format!(
748 "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
749 ))
750 })?;
751 let decode_start = Instant::now();
752 repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
753 .map_err(|err| {
754 ProtocolError::InvalidState(format!(
755 "accept_wire_state_visibility for state {}: {err}",
756 transfer.state_id
757 ))
758 })?;
759 let decode_elapsed = decode_start.elapsed();
760 profile.store_receive_object += decode_elapsed;
761 }
762 Some(pull_message::Body::Complete(complete)) => {
763 profile.receive_and_apply = receive_start.elapsed();
764 let final_state = if complete.new_state.is_empty() {
765 None
766 } else {
767 Some(
768 ChangeId::parse(&complete.new_state)
769 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
770 )
771 };
772
773 if complete.success {
774 if native_pack_required {
775 if !pack_state.is_complete() {
776 return Err(ProtocolError::InvalidState(
777 "pull completed before native pack stream finished".to_string(),
778 ));
779 }
780 let store_start = Instant::now();
781 let installed_ids = wire::install_received_pack(
782 repo.store(),
783 &pack_state.pack_data,
784 &pack_state.index_data,
785 )?;
786 profile.store_receive_object += store_start.elapsed();
787 received = installed_ids.len();
788 for id in installed_ids {
789 match (id, wanted_packable_type(&wanted_types, &id)) {
790 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
791 profile.object_mix.record(ObjectType::Blob);
792 repo.clear_missing_blob(&hash)?;
793 }
794 (_, Some(obj_type)) => {
795 profile.object_mix.record(obj_type);
796 }
797 (PackObjectId::ChangeId(_), None) => {
798 profile.object_mix.record(ObjectType::State);
799 }
800 (PackObjectId::Hash(hash), None) => {
801 let inferred =
802 infer_installed_hash_object_type(repo, &hash)?;
803 profile.object_mix.record(inferred);
804 }
805 }
806 }
807 }
808
809 let metadata_start = Instant::now();
810 if let Some(local_thread) = options.local_thread
811 && let Some(state) = final_state
812 {
813 repo.refs()
814 .set_thread(&ThreadName::from(local_thread), &state)?;
815 }
816 if let Some(state) = final_state
817 && allow_partial_fetch
818 {
819 mark_missing_blobs_for_state(repo, state)?;
820 } else if final_state.is_some() {
821 let _ = repo.clear_all_missing_blobs()?;
822 }
823 let synced_markers = complete
824 .transfer
825 .as_ref()
826 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
827 .transpose()?
828 .unwrap_or(false);
829 if !synced_markers {
830 self.sync_local_markers(repo, repo_path).await?;
831 }
832 profile.metadata_sync = metadata_start.elapsed();
833 profile.objects_received = received;
834 return Ok(PullExchange {
835 result: PullComplete {
836 success: true,
837 final_state,
838 error: None,
839 transfer_id: complete
840 .transfer
841 .as_ref()
842 .map(|transfer| transfer.transfer_id.clone())
843 .unwrap_or_default(),
844 transport_mode: complete
845 .transfer
846 .as_ref()
847 .map(|transfer| transport_mode_name(transfer.transport_mode))
848 .unwrap_or("native-pack")
849 .to_string(),
850 resume_offset: complete
851 .transfer
852 .as_ref()
853 .map(|transfer| transfer.resume_offset)
854 .unwrap_or_default(),
855 chunk_index: complete
856 .transfer
857 .as_ref()
858 .map(|transfer| transfer.chunk_index)
859 .unwrap_or_default(),
860 checkpoint: complete
861 .transfer
862 .as_ref()
863 .map(|transfer| transfer.checkpoint.clone())
864 .unwrap_or_default(),
865 is_complete: complete
866 .transfer
867 .as_ref()
868 .map(|transfer| transfer.is_complete)
869 .unwrap_or(false),
870 },
871 object_count: received,
872 profile,
873 });
874 }
875
876 profile.objects_received = received;
877 return Ok(PullExchange {
878 result: PullComplete {
879 success: false,
880 final_state,
881 error: (!complete.error.is_empty()).then_some(complete.error),
882 transfer_id: complete
883 .transfer
884 .as_ref()
885 .map(|transfer| transfer.transfer_id.clone())
886 .unwrap_or_default(),
887 transport_mode: complete
888 .transfer
889 .as_ref()
890 .map(|transfer| transport_mode_name(transfer.transport_mode))
891 .unwrap_or("native-pack")
892 .to_string(),
893 resume_offset: complete
894 .transfer
895 .as_ref()
896 .map(|transfer| transfer.resume_offset)
897 .unwrap_or_default(),
898 chunk_index: complete
899 .transfer
900 .as_ref()
901 .map(|transfer| transfer.chunk_index)
902 .unwrap_or_default(),
903 checkpoint: complete
904 .transfer
905 .as_ref()
906 .map(|transfer| transfer.checkpoint.clone())
907 .unwrap_or_default(),
908 is_complete: complete
909 .transfer
910 .as_ref()
911 .map(|transfer| transfer.is_complete)
912 .unwrap_or(false),
913 },
914 object_count: received,
915 profile,
916 });
917 }
918 _ => {}
919 }
920 }
921
922 Err(ProtocolError::InvalidState(format!(
923 "pull stream ended unexpectedly after receiving {received} packed objects"
924 )))
925 }
926}
927
928fn redaction_push_message(
929 repo: &Repository,
930 info: wire::ObjectInfo,
931) -> Result<PushMessage, ProtocolError> {
932 let wire::ObjectId::Hash(blob) = info.id else {
933 return Err(ProtocolError::InvalidState(
934 "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
935 ));
936 };
937 let hex = blob.to_hex();
938 let bytes = repo
943 .store()
944 .get_redactions_bytes_for_blob(&blob)
945 .map_err(|err| {
946 ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
947 })?
948 .ok_or_else(|| {
949 ProtocolError::InvalidState(format!(
950 "server wants redaction for blob {} but sender has no sidecar",
951 hex
952 ))
953 })?;
954 Ok(PushMessage {
955 body: Some(push_message::Body::Redaction(RedactionTransfer {
956 blob_hash: hex,
957 redactions_blob: bytes,
958 })),
959 })
960}
961
962fn is_out_of_pack_transfer_object_type(obj_type: ObjectType) -> bool {
963 matches!(
964 obj_type,
965 ObjectType::Redaction | ObjectType::StateVisibility
966 )
967}
968
969fn native_pack_required_for_pull(want_full_closure: bool, wanted_types: &WantedTypes) -> bool {
970 want_full_closure
971 || wanted_types
972 .values()
973 .flatten()
974 .copied()
975 .any(wire::is_native_packable_object_type)
976}
977
978fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
979 let types = wanted_types.entry(pack_id).or_default();
980 if !types.contains(&obj_type) {
981 types.push(obj_type);
982 }
983}
984
985fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
986 wanted_types.get(pack_id).and_then(|types| {
987 types
988 .iter()
989 .copied()
990 .find(|obj_type| wire::is_native_packable_object_type(*obj_type))
991 })
992}
993
994fn sidecar_push_message(
995 repo: &Repository,
996 info: wire::ObjectInfo,
997) -> Result<PushMessage, ProtocolError> {
998 match info.obj_type {
999 ObjectType::Redaction => redaction_push_message(repo, info),
1000 ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1001 obj_type => Err(ProtocolError::InvalidState(format!(
1002 "{obj_type:?} is not an out-of-pack sidecar object"
1003 ))),
1004 }
1005}
1006
1007fn state_visibility_push_message(
1008 repo: &Repository,
1009 info: wire::ObjectInfo,
1010) -> Result<PushMessage, ProtocolError> {
1011 let wire::ObjectId::ChangeId(state) = info.id else {
1012 return Err(ProtocolError::InvalidState(
1013 "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1014 ));
1015 };
1016 let state_id = state.to_string_full();
1017 let bytes = repo
1018 .get_state_visibility_bytes_for_state(&state)
1019 .map_err(|err| {
1020 ProtocolError::InvalidState(format!(
1021 "load state-visibility sidecar for {}: {err}",
1022 state_id
1023 ))
1024 })?
1025 .ok_or_else(|| {
1026 ProtocolError::InvalidState(format!(
1027 "server wants state visibility for state {} but sender has no sidecar",
1028 state_id
1029 ))
1030 })?;
1031 Ok(PushMessage {
1032 body: Some(push_message::Body::StateVisibility(
1033 StateVisibilityTransfer {
1034 state_id,
1035 state_visibility_blob: bytes,
1036 },
1037 )),
1038 })
1039}
1040
1041fn load_thread_metadata(
1042 repo: &Repository,
1043 target_thread: &str,
1044 local_state: ChangeId,
1045) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1046 let thread_manager = ThreadManager::new(repo.heddle_dir());
1047 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1048}
1049
1050fn plan_pull_wants(
1051 repo: &Repository,
1052 remote_state: &ChangeId,
1053 full_closure_available: bool,
1054 objects_to_fetch: Vec<ObjectDescriptor>,
1055 allow_partial_fetch: bool,
1056) -> Result<PullWantPlan, ProtocolError> {
1057 if full_closure_available {
1058 return Ok(PullWantPlan {
1059 wants: Vec::new(),
1060 wanted_types: HashMap::new(),
1061 want_full_closure: true,
1062 });
1063 }
1064 let request_full_closure =
1065 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1066 let mut wants = Vec::with_capacity(objects_to_fetch.len());
1067 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1068
1069 for descriptor in objects_to_fetch {
1070 let info = parse_descriptor_to_info(descriptor)?;
1071 let pack_id = match &info.id {
1072 wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1073 wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1074 };
1075 let include = if request_full_closure {
1076 true
1077 } else {
1078 let has = wire::has_object(repo.store(), &info)?;
1079 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1080 };
1081
1082 if include {
1083 record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1084 wants.push(object_descriptor_with_status(
1085 &info,
1086 ObjectAvailabilityStatus::Missing,
1087 "requested by client",
1088 ));
1089 }
1090 }
1091
1092 Ok(PullWantPlan {
1093 wants,
1094 wanted_types,
1095 want_full_closure: false,
1096 })
1097}
1098
1099fn supports_compact_full_pull(
1100 repo: &Repository,
1101 allow_partial_fetch: bool,
1102 exclude_states: &[ChangeId],
1103) -> Result<bool, ProtocolError> {
1104 if allow_partial_fetch || !exclude_states.is_empty() {
1105 return Ok(false);
1106 }
1107 repo_looks_fresh(repo)
1108}
1109
1110fn should_request_full_closure(
1111 repo: &Repository,
1112 remote_state: &ChangeId,
1113 allow_partial_fetch: bool,
1114) -> Result<bool, ProtocolError> {
1115 if allow_partial_fetch || repo.store().has_state(remote_state)? {
1116 return Ok(false);
1117 }
1118 repo_looks_fresh(repo)
1119}
1120
1121fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1122 if repo.head()?.is_some() {
1123 return Ok(false);
1124 }
1125 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1126 return Ok(false);
1127 }
1128 Ok(repo.missing_blobs()?.is_empty())
1129}
1130
1131fn infer_installed_hash_object_type(
1132 repo: &Repository,
1133 hash: &ContentHash,
1134) -> Result<ObjectType, ProtocolError> {
1135 let store = repo.store();
1136 if store.get_tree(hash)?.is_some() {
1137 return Ok(ObjectType::Tree);
1138 }
1139 if store
1140 .get_action(&objects::object::ActionId::from_hash(*hash))?
1141 .is_some()
1142 {
1143 return Ok(ObjectType::Action);
1144 }
1145 Ok(ObjectType::Blob)
1146}
1147
1148fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1149 const HEADER: &str = "heddle-markers-v1\n";
1150 if checkpoint.is_empty() {
1151 return Ok(false);
1152 }
1153 let payload = std::str::from_utf8(checkpoint)
1154 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1155 let Some(lines) = payload.strip_prefix(HEADER) else {
1156 return Ok(false);
1157 };
1158
1159 for line in lines.lines() {
1160 if line.is_empty() {
1161 continue;
1162 }
1163 let Some((name, change_id)) = line.split_once('\t') else {
1164 return Err(ProtocolError::InvalidState(
1165 "invalid marker snapshot line".to_string(),
1166 ));
1167 };
1168 let change_id = ChangeId::parse(change_id)
1169 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1170 if !repo.store().has_state(&change_id)? {
1171 continue;
1172 }
1173 let name = MarkerName::from(name);
1174 match repo.refs().get_marker(&name)? {
1175 Some(existing) if existing == change_id => {}
1176 Some(existing) => repo.refs().set_marker_cas(
1177 &name,
1178 refs::RefExpectation::Value(existing),
1179 &change_id,
1180 )?,
1181 None => repo.refs().create_marker(&name, &change_id)?,
1182 }
1183 }
1184
1185 Ok(true)
1186}
1187
1188fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1189 if s.is_empty() {
1190 return Vec::new();
1191 }
1192 objects::object::ChangeId::parse(s)
1193 .map(|id| id.as_bytes().to_vec())
1194 .unwrap_or_default()
1195}
1196
1197fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1198 ThreadMetadata {
1199 name: metadata.thread.clone(),
1200 target_thread: metadata.target_thread.clone(),
1201 parent_thread: metadata.parent_thread.clone(),
1202 task: metadata.task.clone(),
1203 thread_mode: metadata.mode.to_string(),
1204 thread_state: metadata.state.to_string(),
1205 freshness: metadata.freshness.to_string(),
1206 base_state: change_id_string_to_bytes(&metadata.base_state),
1207 base_root: change_id_string_to_bytes(&metadata.base_root),
1208 current_state: metadata
1209 .current_state
1210 .as_deref()
1211 .map(change_id_string_to_bytes),
1212 merged_state: metadata
1213 .merged_state
1214 .as_deref()
1215 .map(change_id_string_to_bytes),
1216 changed_paths: metadata.changed_paths.clone(),
1217 impact_categories: metadata
1218 .impact_categories
1219 .iter()
1220 .map(ToString::to_string)
1221 .collect(),
1222 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1223 promotion_suggested: metadata.promotion_suggested,
1224 verification_summary: Some(ThreadVerificationSummary {
1225 tests_passed: metadata.verification_summary.tests_passed,
1226 tests_failed: metadata
1227 .verification_summary
1228 .tests_failed
1229 .unwrap_or_default(),
1230 coverage_pct: metadata.verification_summary.coverage_pct,
1231 lint_warnings: metadata.verification_summary.lint_warnings,
1232 }),
1233 confidence_summary: Some(ThreadConfidenceSummary {
1234 value: metadata.confidence_summary.value,
1235 band: metadata
1236 .confidence_summary
1237 .band
1238 .as_ref()
1239 .map(ToString::to_string),
1240 }),
1241 integration_policy_result: Some(ThreadIntegrationPolicy {
1242 status: metadata
1243 .integration_policy_result
1244 .status
1245 .clone()
1246 .unwrap_or_default(),
1247 reason: metadata
1248 .integration_policy_result
1249 .reason
1250 .clone()
1251 .unwrap_or_default(),
1252 }),
1253 created_at: Some(prost_types::Timestamp {
1254 seconds: metadata.created_at.timestamp(),
1255 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1256 }),
1257 updated_at: Some(prost_types::Timestamp {
1258 seconds: metadata.updated_at.timestamp(),
1259 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1260 }),
1261 }
1262}
1263
1264struct PullExchange {
1265 result: PullComplete,
1266 object_count: usize,
1267 profile: PullProfile,
1268}
1269
1270fn mark_missing_blobs_for_state(
1271 repo: &Repository,
1272 state_id: ChangeId,
1273) -> Result<(), ProtocolError> {
1274 let state = repo
1275 .store()
1276 .get_state(&state_id)?
1277 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1278 let mut missing = collect_missing_blobs(repo, &state.tree)?;
1279 if let Some(context_root) = state.context.as_ref() {
1280 missing.extend(collect_missing_blobs(repo, context_root)?);
1281 }
1282 missing
1283 .into_iter()
1284 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1285}
1286
1287fn clear_missing_blobs_for_state(
1288 repo: &Repository,
1289 state_id: ChangeId,
1290) -> Result<(), ProtocolError> {
1291 let state = repo
1292 .store()
1293 .get_state(&state_id)?
1294 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1295 let mut missing = collect_missing_blobs(repo, &state.tree)?;
1296 if let Some(context_root) = state.context.as_ref() {
1297 missing.extend(collect_missing_blobs(repo, context_root)?);
1298 }
1299 missing
1300 .into_iter()
1301 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1302}
1303
1304fn collect_missing_blobs(
1305 repo: &Repository,
1306 tree_hash: &ContentHash,
1307) -> Result<Vec<ContentHash>, ProtocolError> {
1308 let mut missing = Vec::new();
1309 collect_missing_blobs_recursive(repo, tree_hash, &mut missing)?;
1310 Ok(missing)
1311}
1312
1313fn collect_missing_blobs_recursive(
1314 repo: &Repository,
1315 tree_hash: &ContentHash,
1316 missing: &mut Vec<ContentHash>,
1317) -> Result<(), ProtocolError> {
1318 let Some(tree) = repo.store().get_tree(tree_hash).map_err(|err| {
1319 ProtocolError::InvalidState(format!(
1320 "load tree {} while collecting lazy hydration missing blobs: {err}",
1321 tree_hash.to_hex()
1322 ))
1323 })?
1324 else {
1325 return Ok(());
1326 };
1327 for entry in tree.entries() {
1328 match entry.entry_type {
1329 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1330 if !repo.store().has_blob(&entry.hash).map_err(|err| {
1331 ProtocolError::InvalidState(format!(
1332 "check blob {} while collecting lazy hydration missing blobs: {err}",
1333 entry.hash.to_hex()
1334 ))
1335 })? {
1336 missing.push(entry.hash);
1337 }
1338 }
1339 objects::object::EntryType::Tree => {
1340 collect_missing_blobs_recursive(repo, &entry.hash, missing)?;
1341 }
1342 }
1343 }
1344 Ok(())
1345}
1346
1347fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1348 match repo.missing_blobs() {
1349 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1350 Ok(_) => PartialFetchStatus::Disabled as i32,
1351 Err(_) => PartialFetchStatus::Unspecified as i32,
1352 }
1353}
1354
1355fn pull_transfer_id(
1356 repo_path: &str,
1357 remote_thread: &str,
1358 local_thread: Option<&str>,
1359 depth: Option<u32>,
1360 target_state: Option<ChangeId>,
1361) -> String {
1362 format!(
1363 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1364 local_thread.unwrap_or_default(),
1365 target_state
1366 .map(|value| value.to_string_full())
1367 .unwrap_or_default()
1368 )
1369}
1370
1371fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1372 format!(
1373 "push:{repo_path}:{}:{target_thread}",
1374 local_state.to_string_full()
1375 )
1376}
1377
1378fn encode_native_pack_messages(
1379 bundle: &wire::NativePackBundle,
1380 transfer_id: &str,
1381 chunk_size: usize,
1382 transport: &super::helpers::HostedTransportPolicy,
1383 transport_mode: TransportMode,
1384) -> Result<Vec<PushMessage>, ProtocolError> {
1385 let mut messages = Vec::new();
1386 let chunk_size = chunk_size.max(1);
1387
1388 let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
1389 for chunk_index in 0..pack_total_chunks.max(1) {
1390 let Some((start, len)) =
1391 wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1392 else {
1393 break;
1394 };
1395 messages.push(PushMessage {
1396 body: Some(push_message::Body::Pack(PackChunk {
1397 stream_kind: PackStreamKind::Pack as i32,
1398 data: bundle.pack_data[start..start + len].to_vec(),
1399 transfer: Some(transport.transfer_checkpoint_with_mode(
1400 transfer_id,
1401 transport_mode,
1402 chunk_index as u32,
1403 start as u64,
1404 chunk_index + 1 == pack_total_chunks,
1405 )),
1406 chunk_length: len as u32,
1407 is_final_chunk: chunk_index + 1 == pack_total_chunks,
1408 })),
1409 });
1410 }
1411
1412 let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
1413 for chunk_index in 0..index_total_chunks.max(1) {
1414 let Some((start, len)) =
1415 wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1416 else {
1417 break;
1418 };
1419 messages.push(PushMessage {
1420 body: Some(push_message::Body::Pack(PackChunk {
1421 stream_kind: PackStreamKind::Index as i32,
1422 data: bundle.index_data[start..start + len].to_vec(),
1423 transfer: Some(transport.transfer_checkpoint_with_mode(
1424 transfer_id,
1425 transport_mode,
1426 chunk_index as u32,
1427 start as u64,
1428 chunk_index + 1 == index_total_chunks,
1429 )),
1430 chunk_length: len as u32,
1431 is_final_chunk: chunk_index + 1 == index_total_chunks,
1432 })),
1433 });
1434 }
1435 Ok(messages)
1436}
1437
1438fn preferred_transport_mode(
1439 transport: &super::helpers::HostedTransportPolicy,
1440 object_count: usize,
1441) -> TransportMode {
1442 let _ = transport;
1443 let _ = object_count;
1444 TransportMode::NativePack
1445}
1446
1447#[cfg(test)]
1448mod tests {
1449 use chrono::{TimeZone, Utc};
1450 use cli_shared::ClientConfig;
1451 use grpc::heddle::v1::{
1452 ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
1453 TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
1454 repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
1455 };
1456 use objects::{
1457 object::{
1458 Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
1459 StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
1460 },
1461 store::ObjectStore,
1462 };
1463 use wire::{ObjectId, ObjectInfo};
1464 use tempfile::TempDir;
1465 use tonic::{Response, Status, transport::Server};
1466
1467 use super::*;
1468
1469 fn temp_repo() -> (TempDir, Repository) {
1470 let dir = TempDir::new().expect("tempdir");
1471 let repo = Repository::init_default(dir.path()).expect("init repo");
1472 (dir, repo)
1473 }
1474
1475 fn redaction_info(blob: ContentHash) -> ObjectInfo {
1476 ObjectInfo {
1477 id: ObjectId::Hash(blob),
1478 obj_type: ObjectType::Redaction,
1479 size: 0,
1480 delta_base: None,
1481 }
1482 }
1483
1484 fn state_info(state: ChangeId) -> ObjectInfo {
1485 ObjectInfo {
1486 id: ObjectId::ChangeId(state),
1487 obj_type: ObjectType::State,
1488 size: 0,
1489 delta_base: None,
1490 }
1491 }
1492
1493 fn state_visibility_info(state: ChangeId) -> ObjectInfo {
1494 ObjectInfo {
1495 id: ObjectId::ChangeId(state),
1496 obj_type: ObjectType::StateVisibility,
1497 size: 0,
1498 delta_base: None,
1499 }
1500 }
1501
1502 fn sample_blob() -> ContentHash {
1503 ContentHash::from_bytes([7u8; 32])
1504 }
1505
1506 #[test]
1507 fn descriptor_id_from_info_matches_proto_encode_path() {
1508 let infos = [
1509 redaction_info(sample_blob()),
1510 state_info(ChangeId::from_bytes([3u8; 16])),
1511 state_visibility_info(ChangeId::from_bytes([9u8; 16])),
1512 ];
1513 for info in infos {
1514 assert_eq!(
1515 descriptor_id_from_info(&info),
1516 descriptor_id(&to_proto_object_info(&info)),
1517 "keying path must stay byte-identical to the throwaway-encode path",
1518 );
1519 }
1520 }
1521
1522 fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
1523 let hex = hash.to_hex();
1524 let (prefix, rest) = hex.split_at(2);
1525 repo.heddle_dir()
1526 .join("objects")
1527 .join("trees")
1528 .join(prefix)
1529 .join(rest)
1530 }
1531
1532 fn sample_redaction(blob: ContentHash) -> Redaction {
1533 Redaction {
1534 redacted_blob: blob,
1535 state: ChangeId::from_bytes([1u8; 16]),
1536 path: "config/secrets.toml".into(),
1537 reason: "leaked credential".into(),
1538 redactor: Principal {
1539 name: "Grace Hopper".into(),
1540 email: "grace@example.com".into(),
1541 },
1542 redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
1543 signature: None,
1544 purged_at: None,
1545 supersedes: None,
1546 }
1547 }
1548
1549 fn sample_state_visibility(state: ChangeId) -> StateVisibility {
1550 StateVisibility {
1551 state,
1552 tier: VisibilityTier::Restricted {
1553 scope_label: "security-embargo".into(),
1554 },
1555 embargo_until: None,
1556 declarer: Principal {
1557 name: "Grace Hopper".into(),
1558 email: "grace@example.com".into(),
1559 },
1560 declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
1561 signature: None,
1562 supersedes: None,
1563 }
1564 }
1565
1566 #[test]
1567 fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
1568 for obj_type in wire::native_pack_excluded_object_types() {
1569 assert!(
1570 is_out_of_pack_transfer_object_type(*obj_type),
1571 "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
1572 );
1573 }
1574 }
1575
1576 #[test]
1577 fn native_pack_required_tracks_packable_pull_wants() {
1578 let blob = sample_blob();
1579 let state = ChangeId::from_bytes([9u8; 16]);
1580
1581 let sidecar_only = HashMap::from([(
1582 PackObjectId::ChangeId(state),
1583 vec![ObjectType::StateVisibility],
1584 )]);
1585 assert!(!native_pack_required_for_pull(false, &sidecar_only));
1586
1587 let redaction_only =
1588 HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Redaction])]);
1589 assert!(!native_pack_required_for_pull(false, &redaction_only));
1590
1591 let packable = HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Blob])]);
1592 assert!(native_pack_required_for_pull(false, &packable));
1593
1594 let state_with_sidecar = HashMap::from([(
1595 PackObjectId::ChangeId(state),
1596 vec![ObjectType::State, ObjectType::StateVisibility],
1597 )]);
1598 assert!(native_pack_required_for_pull(false, &state_with_sidecar));
1599 assert!(native_pack_required_for_pull(true, &HashMap::new()));
1600 }
1601
1602 #[test]
1603 fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
1604 let (_dir, repo) = temp_repo();
1605 let state = ChangeId::from_bytes([9u8; 16]);
1606 let plan = plan_pull_wants(
1607 &repo,
1608 &state,
1609 false,
1610 vec![
1611 object_descriptor_with_status(
1612 &state_info(state),
1613 ObjectAvailabilityStatus::Missing,
1614 "missing state",
1615 ),
1616 object_descriptor_with_status(
1617 &state_visibility_info(state),
1618 ObjectAvailabilityStatus::Missing,
1619 "missing state visibility",
1620 ),
1621 ],
1622 false,
1623 )
1624 .expect("plan pull wants");
1625
1626 let wanted = plan
1627 .wanted_types
1628 .get(&PackObjectId::ChangeId(state))
1629 .expect("same ChangeId want entry");
1630 assert_eq!(
1631 wanted.as_slice(),
1632 &[ObjectType::State, ObjectType::StateVisibility]
1633 );
1634 assert!(native_pack_required_for_pull(
1635 plan.want_full_closure,
1636 &plan.wanted_types
1637 ));
1638 }
1639
1640 #[derive(Clone)]
1641 struct SidecarOnlyPullService {
1642 state: ChangeId,
1643 state_visibility_blob: Vec<u8>,
1644 }
1645
1646 #[tonic::async_trait]
1647 impl RepoSyncService for SidecarOnlyPullService {
1648 async fn list_refs(
1649 &self,
1650 _request: tonic::Request<ListRefsRequest>,
1651 ) -> Result<Response<ListRefsResponse>, Status> {
1652 Ok(Response::new(ListRefsResponse::default()))
1653 }
1654
1655 async fn update_ref(
1656 &self,
1657 _request: tonic::Request<UpdateRefRequest>,
1658 ) -> Result<Response<UpdateRefResponse>, Status> {
1659 Ok(Response::new(UpdateRefResponse::default()))
1660 }
1661
1662 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
1663
1664 async fn push(
1665 &self,
1666 _request: tonic::Request<tonic::Streaming<PushMessage>>,
1667 ) -> Result<Response<Self::PushStream>, Status> {
1668 let (_tx, rx) = mpsc::channel(1);
1669 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
1670 rx,
1671 )))
1672 }
1673
1674 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
1675
1676 async fn pull(
1677 &self,
1678 request: tonic::Request<tonic::Streaming<PullMessage>>,
1679 ) -> Result<Response<Self::PullStream>, Status> {
1680 let state = self.state;
1681 let state_visibility_blob = self.state_visibility_blob.clone();
1682 let (tx, rx) = mpsc::channel(4);
1683
1684 tokio::spawn(async move {
1685 let mut inbound = request.into_inner();
1686 match inbound.message().await {
1687 Ok(Some(PullMessage {
1688 body: Some(pull_message::Body::Request(_)),
1689 })) => {}
1690 other => {
1691 let _ = tx
1692 .send(Err(Status::invalid_argument(format!(
1693 "expected pull request, got {other:?}"
1694 ))))
1695 .await;
1696 return;
1697 }
1698 }
1699
1700 let descriptor = object_descriptor_with_status(
1701 &state_visibility_info(state),
1702 ObjectAvailabilityStatus::Missing,
1703 "missing state visibility",
1704 );
1705 let ready = PullMessage {
1706 body: Some(pull_message::Body::Ready(PullReady {
1707 remote_state: state.to_string_full(),
1708 objects_to_fetch: vec![descriptor],
1709 transfer: None,
1710 partial_fetch_status: PartialFetchStatus::Disabled as i32,
1711 missing_objects: Vec::new(),
1712 full_closure_available: false,
1713 object_count: 1,
1714 })),
1715 };
1716 if tx.send(Ok(ready)).await.is_err() {
1717 return;
1718 }
1719
1720 match inbound.message().await {
1721 Ok(Some(PullMessage {
1722 body: Some(pull_message::Body::Want(want)),
1723 })) if !want.want_full_closure
1724 && want.objects.len() == 1
1725 && want.objects[0].object_type == "state_visibility" => {}
1726 other => {
1727 let _ = tx
1728 .send(Err(Status::invalid_argument(format!(
1729 "expected sidecar-only want, got {other:?}"
1730 ))))
1731 .await;
1732 return;
1733 }
1734 }
1735
1736 let transfer = PullMessage {
1737 body: Some(pull_message::Body::StateVisibility(
1738 StateVisibilityTransfer {
1739 state_id: state.to_string_full(),
1740 state_visibility_blob,
1741 },
1742 )),
1743 };
1744 if tx.send(Ok(transfer)).await.is_err() {
1745 return;
1746 }
1747
1748 let complete = PullMessage {
1749 body: Some(pull_message::Body::Complete(GrpcPullComplete {
1750 success: true,
1751 new_state: state.to_string_full(),
1752 error: String::new(),
1753 transfer: Some(TransferCheckpoint {
1754 transfer_id: "sidecar-only-test".to_string(),
1755 transport_mode: TransportMode::NativePack as i32,
1756 resume_offset: 0,
1757 chunk_index: 0,
1758 checkpoint: b"heddle-markers-v1\n".to_vec(),
1759 is_complete: true,
1760 }),
1761 })),
1762 };
1763 let _ = tx.send(Ok(complete)).await;
1764 });
1765
1766 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
1767 rx,
1768 )))
1769 }
1770 }
1771
1772 async fn connect_sidecar_only_service(
1773 service: SidecarOnlyPullService,
1774 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
1775 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
1776 Ok(listener) => listener,
1777 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
1778 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
1779 return None;
1780 }
1781 Err(err) => panic!("bind test server: {err}"),
1782 };
1783 let addr = listener.local_addr().expect("local addr");
1784 let incoming = futures::stream::unfold(listener, |listener| async {
1785 match listener.accept().await {
1786 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
1787 Err(err) => Some((Err(err), listener)),
1788 }
1789 });
1790
1791 let handle = tokio::spawn(async move {
1792 Server::builder()
1793 .add_service(RepoSyncServiceServer::new(service))
1794 .serve_with_incoming(incoming)
1795 .await
1796 .expect("serve sidecar-only test service");
1797 });
1798
1799 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
1800 .await
1801 .expect("connect client");
1802 Some((client, handle))
1803 }
1804
1805 #[tokio::test]
1806 async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
1807 let (_dir, repo) = temp_repo();
1808 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1809 let state = State::new_snapshot(
1810 tree_hash,
1811 vec![],
1812 Attribution::human(Principal {
1813 name: "Grace Hopper".into(),
1814 email: "grace@example.com".into(),
1815 }),
1816 );
1817 let state_id = state.change_id;
1818 repo.store().put_state(&state).expect("put state");
1819 assert!(
1820 repo.get_state_visibility_bytes_for_state(&state_id)
1821 .expect("load local sidecar")
1822 .is_none(),
1823 "test starts with state present and StateVisibility sidecar absent"
1824 );
1825
1826 let state_visibility_blob =
1827 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
1828 .encode()
1829 .expect("encode state visibility blob");
1830 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1831 state: state_id,
1832 state_visibility_blob,
1833 })
1834 .await
1835 else {
1836 return;
1837 };
1838
1839 let exchange = tokio::time::timeout(
1840 Duration::from_secs(5),
1841 client.pull_exchange(
1842 &repo,
1843 "owner/repo",
1844 "main",
1845 PullOptions {
1846 local_thread: None,
1847 depth: None,
1848 target_state: Some(state_id),
1849 materialization: PullMaterialization::Full,
1850 },
1851 ),
1852 )
1853 .await
1854 .expect("sidecar-only pull must not hang waiting for native pack")
1855 .expect("sidecar-only pull succeeds");
1856 server.abort();
1857
1858 assert!(exchange.result.success);
1859 assert_eq!(exchange.object_count, 0);
1860 assert_eq!(exchange.profile.pack_bytes_received, 0);
1861 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
1862 assert!(
1863 repo.get_state_visibility_for_state(&state_id)
1864 .expect("load accepted sidecar")
1865 .has_record(),
1866 "pull must accept the out-of-pack StateVisibility sidecar"
1867 );
1868 }
1869
1870 #[tokio::test]
1874 async fn legitimate_large_sidecar_blob_decodes_and_installs() {
1875 let (_dir, repo) = temp_repo();
1876 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1877 let state = State::new_snapshot(
1878 tree_hash,
1879 vec![],
1880 Attribution::human(Principal {
1881 name: "Grace Hopper".into(),
1882 email: "grace@example.com".into(),
1883 }),
1884 );
1885 let state_id = state.change_id;
1886 repo.store().put_state(&state).expect("put state");
1887
1888 let mut record = sample_state_visibility(state_id);
1891 record.tier = VisibilityTier::Restricted {
1892 scope_label: "x".repeat(8 * 1024 * 1024),
1893 };
1894 let state_visibility_blob = StateVisibilityBlob::new(vec![record])
1895 .encode()
1896 .expect("encode large state visibility blob");
1897 assert!(
1898 state_visibility_blob.len() > 4 * 1024 * 1024,
1899 "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
1900 );
1901 assert!(
1902 (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
1903 "blob must stay within the legitimate sidecar receive cap"
1904 );
1905
1906 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1907 state: state_id,
1908 state_visibility_blob,
1909 })
1910 .await
1911 else {
1912 return;
1913 };
1914
1915 let exchange = tokio::time::timeout(
1916 Duration::from_secs(30),
1917 client.pull_exchange(
1918 &repo,
1919 "owner/repo",
1920 "main",
1921 PullOptions {
1922 local_thread: None,
1923 depth: None,
1924 target_state: Some(state_id),
1925 materialization: PullMaterialization::Full,
1926 },
1927 ),
1928 )
1929 .await
1930 .expect("large-sidecar pull must not hang")
1931 .expect("large but legitimate sidecar pull succeeds");
1932 server.abort();
1933
1934 assert!(exchange.result.success);
1935 assert!(
1936 repo.get_state_visibility_for_state(&state_id)
1937 .expect("load accepted sidecar")
1938 .has_record(),
1939 "pull must accept a legitimately-large StateVisibility sidecar"
1940 );
1941 }
1942
1943 #[tokio::test]
1947 async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
1948 let (_dir, repo) = temp_repo();
1949 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
1950 let state = State::new_snapshot(
1951 tree_hash,
1952 vec![],
1953 Attribution::human(Principal {
1954 name: "Grace Hopper".into(),
1955 email: "grace@example.com".into(),
1956 }),
1957 );
1958 let state_id = state.change_id;
1959 repo.store().put_state(&state).expect("put state");
1960
1961 let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
1964 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
1965 state: state_id,
1966 state_visibility_blob: oversized,
1967 })
1968 .await
1969 else {
1970 return;
1971 };
1972
1973 let result = tokio::time::timeout(
1974 Duration::from_secs(30),
1975 client.pull_exchange(
1976 &repo,
1977 "owner/repo",
1978 "main",
1979 PullOptions {
1980 local_thread: None,
1981 depth: None,
1982 target_state: Some(state_id),
1983 materialization: PullMaterialization::Full,
1984 },
1985 ),
1986 )
1987 .await
1988 .expect("oversized-sidecar pull must not hang");
1989 server.abort();
1990
1991 let err = match result {
1992 Err(err) => err,
1993 Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
1994 };
1995 let message = err.to_string();
1996 assert!(
1997 !message.contains("exceeds receive size limit"),
1998 "rejection must come from the decode-size limit, before the post-decode check: {message}"
1999 );
2000 assert!(
2001 repo.get_state_visibility_for_state(&state_id)
2002 .expect("load sidecar")
2003 .latest()
2004 .expect("resolve visibility")
2005 .is_none(),
2006 "an oversized sidecar must never be installed"
2007 );
2008 }
2009
2010 #[derive(Clone)]
2011 struct StateAndVisibilityPullService {
2012 state: ChangeId,
2013 pack_bundle: wire::NativePackBundle,
2014 state_visibility_blob: Vec<u8>,
2015 }
2016
2017 #[tonic::async_trait]
2018 impl RepoSyncService for StateAndVisibilityPullService {
2019 async fn list_refs(
2020 &self,
2021 _request: tonic::Request<ListRefsRequest>,
2022 ) -> Result<Response<ListRefsResponse>, Status> {
2023 Ok(Response::new(ListRefsResponse::default()))
2024 }
2025
2026 async fn update_ref(
2027 &self,
2028 _request: tonic::Request<UpdateRefRequest>,
2029 ) -> Result<Response<UpdateRefResponse>, Status> {
2030 Ok(Response::new(UpdateRefResponse::default()))
2031 }
2032
2033 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
2034
2035 async fn push(
2036 &self,
2037 _request: tonic::Request<tonic::Streaming<PushMessage>>,
2038 ) -> Result<Response<Self::PushStream>, Status> {
2039 let (_tx, rx) = mpsc::channel(1);
2040 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
2041 rx,
2042 )))
2043 }
2044
2045 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
2046
2047 async fn pull(
2048 &self,
2049 request: tonic::Request<tonic::Streaming<PullMessage>>,
2050 ) -> Result<Response<Self::PullStream>, Status> {
2051 let state = self.state;
2052 let pack_bundle = self.pack_bundle.clone();
2053 let state_visibility_blob = self.state_visibility_blob.clone();
2054 let (tx, rx) = mpsc::channel(8);
2055
2056 tokio::spawn(async move {
2057 let mut inbound = request.into_inner();
2058 match inbound.message().await {
2059 Ok(Some(PullMessage {
2060 body: Some(pull_message::Body::Request(_)),
2061 })) => {}
2062 other => {
2063 let _ = tx
2064 .send(Err(Status::invalid_argument(format!(
2065 "expected pull request, got {other:?}"
2066 ))))
2067 .await;
2068 return;
2069 }
2070 }
2071
2072 let ready = PullMessage {
2073 body: Some(pull_message::Body::Ready(PullReady {
2074 remote_state: state.to_string_full(),
2075 objects_to_fetch: vec![
2076 object_descriptor_with_status(
2077 &state_info(state),
2078 ObjectAvailabilityStatus::Missing,
2079 "missing state",
2080 ),
2081 object_descriptor_with_status(
2082 &state_visibility_info(state),
2083 ObjectAvailabilityStatus::Missing,
2084 "missing state visibility",
2085 ),
2086 ],
2087 transfer: None,
2088 partial_fetch_status: PartialFetchStatus::Disabled as i32,
2089 missing_objects: Vec::new(),
2090 full_closure_available: false,
2091 object_count: 2,
2092 })),
2093 };
2094 if tx.send(Ok(ready)).await.is_err() {
2095 return;
2096 }
2097
2098 match inbound.message().await {
2099 Ok(Some(PullMessage {
2100 body: Some(pull_message::Body::Want(want)),
2101 })) if !want.want_full_closure
2102 && want.objects.len() == 2
2103 && want
2104 .objects
2105 .iter()
2106 .any(|object| object.object_type == "state")
2107 && want
2108 .objects
2109 .iter()
2110 .any(|object| object.object_type == "state_visibility") => {}
2111 other => {
2112 let _ = tx
2113 .send(Err(Status::invalid_argument(format!(
2114 "expected state + sidecar wants, got {other:?}"
2115 ))))
2116 .await;
2117 return;
2118 }
2119 }
2120
2121 for message in
2122 encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
2123 {
2124 if tx.send(Ok(message)).await.is_err() {
2125 return;
2126 }
2127 }
2128
2129 let transfer = PullMessage {
2130 body: Some(pull_message::Body::StateVisibility(
2131 StateVisibilityTransfer {
2132 state_id: state.to_string_full(),
2133 state_visibility_blob,
2134 },
2135 )),
2136 };
2137 if tx.send(Ok(transfer)).await.is_err() {
2138 return;
2139 }
2140
2141 let complete = PullMessage {
2142 body: Some(pull_message::Body::Complete(GrpcPullComplete {
2143 success: true,
2144 new_state: state.to_string_full(),
2145 error: String::new(),
2146 transfer: Some(TransferCheckpoint {
2147 transfer_id: "state-and-visibility-test".to_string(),
2148 transport_mode: TransportMode::NativePack as i32,
2149 resume_offset: 0,
2150 chunk_index: 0,
2151 checkpoint: b"heddle-markers-v1\n".to_vec(),
2152 is_complete: true,
2153 }),
2154 })),
2155 };
2156 let _ = tx.send(Ok(complete)).await;
2157 });
2158
2159 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
2160 rx,
2161 )))
2162 }
2163 }
2164
2165 fn encode_pull_native_pack_messages(
2166 bundle: &wire::NativePackBundle,
2167 transfer_id: &str,
2168 chunk_size: usize,
2169 ) -> Vec<PullMessage> {
2170 let mut messages = Vec::new();
2171 let chunk_size = chunk_size.max(1);
2172
2173 let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
2174 for chunk_index in 0..pack_total_chunks.max(1) {
2175 let Some((start, len)) =
2176 wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
2177 else {
2178 break;
2179 };
2180 messages.push(PullMessage {
2181 body: Some(pull_message::Body::Pack(PackChunk {
2182 stream_kind: PackStreamKind::Pack as i32,
2183 data: bundle.pack_data[start..start + len].to_vec(),
2184 transfer: Some(TransferCheckpoint {
2185 transfer_id: transfer_id.to_string(),
2186 transport_mode: TransportMode::NativePack as i32,
2187 resume_offset: start as u64,
2188 chunk_index: chunk_index as u32,
2189 checkpoint: Vec::new(),
2190 is_complete: chunk_index + 1 == pack_total_chunks,
2191 }),
2192 chunk_length: len as u32,
2193 is_final_chunk: chunk_index + 1 == pack_total_chunks,
2194 })),
2195 });
2196 }
2197
2198 let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
2199 for chunk_index in 0..index_total_chunks.max(1) {
2200 let Some((start, len)) =
2201 wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
2202 else {
2203 break;
2204 };
2205 messages.push(PullMessage {
2206 body: Some(pull_message::Body::Pack(PackChunk {
2207 stream_kind: PackStreamKind::Index as i32,
2208 data: bundle.index_data[start..start + len].to_vec(),
2209 transfer: Some(TransferCheckpoint {
2210 transfer_id: transfer_id.to_string(),
2211 transport_mode: TransportMode::NativePack as i32,
2212 resume_offset: start as u64,
2213 chunk_index: chunk_index as u32,
2214 checkpoint: Vec::new(),
2215 is_complete: chunk_index + 1 == index_total_chunks,
2216 }),
2217 chunk_length: len as u32,
2218 is_final_chunk: chunk_index + 1 == index_total_chunks,
2219 })),
2220 });
2221 }
2222
2223 messages
2224 }
2225
2226 async fn connect_state_and_visibility_service(
2227 service: StateAndVisibilityPullService,
2228 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
2229 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
2230 Ok(listener) => listener,
2231 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
2232 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
2233 return None;
2234 }
2235 Err(err) => panic!("bind test server: {err}"),
2236 };
2237 let addr = listener.local_addr().expect("local addr");
2238 let incoming = futures::stream::unfold(listener, |listener| async {
2239 match listener.accept().await {
2240 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
2241 Err(err) => Some((Err(err), listener)),
2242 }
2243 });
2244
2245 let handle = tokio::spawn(async move {
2246 Server::builder()
2247 .add_service(RepoSyncServiceServer::new(service))
2248 .serve_with_incoming(incoming)
2249 .await
2250 .expect("serve state-and-visibility test service");
2251 });
2252
2253 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
2254 .await
2255 .expect("connect client");
2256 Some((client, handle))
2257 }
2258
2259 #[tokio::test]
2260 async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
2261 let (_source_dir, source_repo) = temp_repo();
2262 let (_target_dir, target_repo) = temp_repo();
2263 let tree_hash = source_repo
2264 .store()
2265 .put_tree(&Tree::new())
2266 .expect("put tree");
2267 let state = State::new_snapshot(
2268 tree_hash,
2269 vec![],
2270 Attribution::human(Principal {
2271 name: "Grace Hopper".into(),
2272 email: "grace@example.com".into(),
2273 }),
2274 );
2275 let state_id = state.change_id;
2276 source_repo
2277 .store()
2278 .put_state(&state)
2279 .expect("put source state");
2280 let state_visibility_blob =
2281 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
2282 .encode()
2283 .expect("encode state visibility blob");
2284 source_repo
2285 .accept_wire_state_visibility(state_id, &state_visibility_blob)
2286 .expect("put source state visibility");
2287 let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
2288 .expect("build state pack");
2289
2290 assert!(
2291 target_repo
2292 .store()
2293 .get_state(&state_id)
2294 .expect("load target state")
2295 .is_none(),
2296 "test starts with state absent"
2297 );
2298 assert!(
2299 target_repo
2300 .get_state_visibility_bytes_for_state(&state_id)
2301 .expect("load target sidecar")
2302 .is_none(),
2303 "test starts with StateVisibility sidecar absent"
2304 );
2305
2306 let Some((mut client, server)) =
2307 connect_state_and_visibility_service(StateAndVisibilityPullService {
2308 state: state_id,
2309 pack_bundle,
2310 state_visibility_blob,
2311 })
2312 .await
2313 else {
2314 return;
2315 };
2316
2317 let exchange = tokio::time::timeout(
2318 Duration::from_secs(5),
2319 client.pull_exchange(
2320 &target_repo,
2321 "owner/repo",
2322 "main",
2323 PullOptions {
2324 local_thread: None,
2325 depth: None,
2326 target_state: Some(state_id),
2327 materialization: PullMaterialization::Full,
2328 },
2329 ),
2330 )
2331 .await
2332 .expect("state + sidecar pull must not hang waiting for native pack")
2333 .expect("state + sidecar pull succeeds");
2334 server.abort();
2335
2336 assert!(exchange.result.success);
2337 assert_eq!(exchange.object_count, 1);
2338 assert!(exchange.profile.pack_bytes_received > 0);
2339 assert_eq!(exchange.profile.object_mix.states, 1);
2340 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
2341 assert!(
2342 target_repo
2343 .store()
2344 .get_state(&state_id)
2345 .expect("load installed state")
2346 .is_some(),
2347 "native pack must install the State"
2348 );
2349 assert!(
2350 target_repo
2351 .get_state_visibility_for_state(&state_id)
2352 .expect("load accepted sidecar")
2353 .has_record(),
2354 "pull must accept the out-of-pack StateVisibility sidecar"
2355 );
2356 }
2357
2358 #[test]
2359 fn collect_missing_blobs_treats_absent_tree_as_empty() {
2360 let (_dir, repo) = temp_repo();
2361 let absent_tree = ContentHash::from_bytes([99u8; 32]);
2362
2363 let missing =
2364 collect_missing_blobs(&repo, &absent_tree).expect("absent tree is not an error");
2365
2366 assert!(missing.is_empty());
2367 }
2368
2369 #[test]
2370 fn collect_missing_blobs_reports_only_genuinely_missing_blobs() {
2371 let (_dir, repo) = temp_repo();
2372 let present_blob = Blob::from("already local");
2373 let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
2374 let missing_hash = ContentHash::from_bytes([42u8; 32]);
2375 let tree = Tree::from_entries(vec![
2376 TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
2377 TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
2378 ]);
2379 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
2380
2381 let missing = collect_missing_blobs(&repo, &tree_hash).expect("collect missing blobs");
2382
2383 assert_eq!(missing, vec![missing_hash]);
2384 }
2385
2386 #[test]
2387 fn collect_missing_blobs_reports_corrupt_tree_read() {
2388 let (_dir, repo) = temp_repo();
2389 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
2390 std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
2391 repo.store().clear_recent_caches();
2392
2393 let err = collect_missing_blobs(&repo, &tree_hash).expect_err("corrupt tree must fail");
2394
2395 assert!(matches!(err, ProtocolError::InvalidState(_)));
2396 assert!(
2397 err.to_string().contains(&format!(
2398 "load tree {} while collecting lazy hydration missing blobs",
2399 tree_hash.to_hex()
2400 )),
2401 "unexpected error: {err}"
2402 );
2403 }
2404
2405 #[test]
2406 fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
2407 let (_dir, repo) = temp_repo();
2408 let blob = sample_blob();
2409 repo.put_redaction(sample_redaction(blob))
2410 .expect("put redaction");
2411 let expected_bytes = repo
2412 .store()
2413 .get_redactions_bytes_for_blob(&blob)
2414 .expect("load sidecar")
2415 .expect("sidecar exists");
2416
2417 let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
2418
2419 let Some(push_message::Body::Redaction(transfer)) = message.body else {
2420 panic!("expected redaction transfer");
2421 };
2422 assert_eq!(transfer.blob_hash, blob.to_hex());
2423 assert_eq!(transfer.redactions_blob, expected_bytes);
2424 }
2425
2426 #[test]
2427 fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
2428 let (_dir, repo) = temp_repo();
2429 let blob = sample_blob();
2430
2431 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
2432
2433 assert!(matches!(err, ProtocolError::InvalidState(_)));
2434 assert!(
2435 err.to_string().contains(&format!(
2436 "server wants redaction for blob {} but sender has no sidecar",
2437 blob.to_hex()
2438 )),
2439 "unexpected error: {err}"
2440 );
2441 }
2442
2443 #[test]
2444 fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
2445 let (_dir, repo) = temp_repo();
2446 let blob = sample_blob();
2447 let redaction_path = repo
2448 .heddle_dir()
2449 .join("redactions")
2450 .join(format!("{}.bin", blob.to_hex()));
2451 std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
2452
2453 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
2454
2455 assert!(matches!(err, ProtocolError::InvalidState(_)));
2456 assert!(
2457 err.to_string()
2458 .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
2459 "unexpected error: {err}"
2460 );
2461 }
2462
2463 #[test]
2464 fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
2465 let (_dir, repo) = temp_repo();
2466 let state = ChangeId::from_bytes([17u8; 16]);
2467 repo.put_state_visibility(sample_state_visibility(state))
2468 .expect("put state visibility");
2469 let expected_bytes = repo
2470 .get_state_visibility_bytes_for_state(&state)
2471 .expect("load sidecar")
2472 .expect("sidecar exists");
2473
2474 let message =
2475 state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
2476
2477 let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
2478 panic!("expected state visibility transfer");
2479 };
2480 assert_eq!(transfer.state_id, state.to_string_full());
2481 assert_eq!(transfer.state_visibility_blob, expected_bytes);
2482 }
2483}