1use std::{
2 collections::{BTreeMap, HashMap},
3 io::{self, Seek, SeekFrom, Write},
4 sync::{
5 Arc, Mutex,
6 atomic::{AtomicUsize, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use tempfile::NamedTempFile;
12
13use grpc::heddle::v1::{
14 GetBlobRequest, GitCheckpointTransfer, GitLaneTransfer, GitPackTransfer,
15 GitRefKind as GrpcGitRefKind,
16 GitRefUpdateTransfer, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
17 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
18 RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
19 ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
20 git_lane_transfer, pull_message, push_message,
21};
22use objects::{
23 Progress,
24 object::{ChangeId, ContentHash, MarkerName, ThreadName},
25 store::{AnyStore, ObjectStore, PackObjectId},
26};
27use repo::{
28 GitRefKind as ClassifiedGitRefKind, GitRefName, Repository, RepositoryCapability,
29 RevisionAddress, SyncedThreadMetadata,
30 ThreadManager,
31};
32use sley::{
33 ObjectId as GitObjectId, RefPrecondition, ReferenceTarget, Repository as SleyRepository,
34};
35use tokio::sync::mpsc;
36use tokio_stream::wrappers::ReceiverStream;
37use tonic::Request;
38use wire::{
39 GitLaneTransferIntent, ObjectInfo, ObjectType, ObjectTypeBucket, PlannedObject, ProtocolError,
40 PullComplete, PushComplete, RefEntry, RefUpdated, RepositoryTransferPlan,
41};
42
43use super::{
44 HostedGrpcClient, PullMaterialization,
45 helpers::{
46 descriptor_id, descriptor_id_from_info, object_descriptor_with_status, object_type_name,
47 parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
48 transport_mode_name,
49 },
50};
51
52#[derive(Clone, Copy)]
53struct PullOptions<'a> {
54 local_thread: Option<&'a str>,
55 depth: Option<u32>,
56 target_state: Option<ChangeId>,
57 materialization: PullMaterialization,
58}
59
60struct PullWantPlan {
61 wants: Vec<ObjectDescriptor>,
62 transfer_plan: RepositoryTransferPlan<ObjectInfo>,
63 wanted_types: WantedTypes,
64 want_full_closure: bool,
65}
66
67type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
68
69struct GitLanePushPlan {
70 local_revision_address: String,
71 pack: GitPackPushPlan,
72 ref_updates: Vec<PushMessage>,
78}
79
80#[derive(Clone)]
81struct GitPackPushPlan {
82 transfer_id: String,
83 pack_id: Vec<u8>,
84 pack_size: u64,
85 #[cfg_attr(not(test), allow(dead_code))]
90 roots: Vec<GitObjectId>,
91 pack_file: Arc<Mutex<NamedTempFile>>,
94}
95
96const PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD: usize = 512;
97const PULL_PACK_SPOOL_OBJECT_THRESHOLD: usize = 512;
98const NATIVE_PACK_DRAIN_OBJECT_INTERVAL: usize = 32;
99const NATIVE_PACK_OBJECT_PREFETCH_LIMIT: usize = 32;
100const NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT: usize = 8;
101
102#[derive(Debug, Clone, Default)]
103pub struct PullObjectMix {
104 pub blobs: usize,
105 pub trees: usize,
106 pub states: usize,
107 pub actions: usize,
108 pub redactions: usize,
109 pub state_visibilities: usize,
110}
111
112#[derive(Debug, Clone)]
113pub struct HostedRefEntry {
114 pub name: String,
115 pub change_id: ChangeId,
116 pub is_thread: bool,
117 pub revision_address: String,
118}
119
120impl PullObjectMix {
121 fn record(&mut self, obj_type: ObjectType) {
122 match obj_type.bucket() {
123 ObjectTypeBucket::Blob => self.blobs += 1,
124 ObjectTypeBucket::Tree => self.trees += 1,
125 ObjectTypeBucket::State => self.states += 1,
126 ObjectTypeBucket::Action => self.actions += 1,
127 ObjectTypeBucket::Redaction => self.redactions += 1,
128 ObjectTypeBucket::StateVisibility => self.state_visibilities += 1,
129 }
130 }
131
132 pub fn total(&self) -> usize {
133 self.blobs
134 + self.trees
135 + self.states
136 + self.actions
137 + self.redactions
138 + self.state_visibilities
139 }
140}
141
142#[derive(Debug, Clone, Default)]
143pub struct PullProfile {
144 pub ready_wait: Duration,
145 pub receive_and_apply: Duration,
146 pub decode: Duration,
147 pub store_receive_object: Duration,
148 pub metadata_sync: Duration,
149 pub pack_decode_apply: Duration,
150 pub raw_decode_apply: Duration,
151 pub pack_decode: Duration,
152 pub raw_decode: Duration,
153 pub bytes_received: usize,
154 pub pack_bytes_received: usize,
155 pub raw_bytes_received: usize,
156 pub objects_received: usize,
157 pub object_mix: PullObjectMix,
158}
159
160impl HostedGrpcClient {
161 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
162 Ok(self
163 .list_refs_with_revision_addresses(repo_path)
164 .await?
165 .into_iter()
166 .map(|entry| RefEntry {
167 name: entry.name,
168 change_id: entry.change_id,
169 is_thread: entry.is_thread,
170 })
171 .collect())
172 }
173
174 pub async fn list_refs_with_revision_addresses(
175 &mut self,
176 repo_path: &str,
177 ) -> Result<Vec<HostedRefEntry>, ProtocolError> {
178 let mut request = Request::new(ListRefsRequest {
179 repo_path: repo_path.to_string(),
180 });
181 self.apply_signed_auth(&mut request, "/heddle.v1.RepoSyncService/ListRefs")?;
182 let response = self
183 .inner
184 .list_refs(request)
185 .await
186 .map_err(status_to_protocol_error)?
187 .into_inner();
188 response
189 .refs
190 .into_iter()
191 .map(|entry| {
192 Ok(HostedRefEntry {
193 name: entry.name,
194 change_id: ChangeId::try_from_slice(&entry.change_id)
195 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
196 is_thread: entry.is_thread,
197 revision_address: entry.revision_address,
198 })
199 })
200 .collect()
201 }
202
203 #[allow(clippy::too_many_arguments)]
204 pub async fn update_ref(
205 &mut self,
206 repo_path: &str,
207 name: &str,
208 is_thread: bool,
209 old_value: Option<ChangeId>,
210 new_value: ChangeId,
211 force: bool,
212 thread_metadata: Option<&SyncedThreadMetadata>,
213 ) -> Result<RefUpdated, ProtocolError> {
214 let mut request = Request::new(UpdateRefRequest {
215 repo_path: repo_path.to_string(),
216 name: name.to_string(),
217 is_thread,
218 force,
219 old_value: old_value
220 .map(|value| value.to_string_full())
221 .unwrap_or_default(),
222 new_value: new_value.to_string_full(),
223 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
224 old_revision_address: old_value
225 .map(|value| RevisionAddress::heddle(value).to_string())
226 .unwrap_or_default(),
227 new_revision_address: RevisionAddress::heddle(new_value).to_string(),
228 client_operation_id: String::new(),
229 });
230 self.apply_signed_auth(&mut request, "/heddle.v1.RepoSyncService/UpdateRef")?;
231 let response = self
232 .inner
233 .update_ref(request)
234 .await
235 .map_err(status_to_protocol_error)?
236 .into_inner();
237 Ok(RefUpdated {
238 success: response.success,
239 old_value: if response.old_value.is_empty() {
240 None
241 } else {
242 Some(
243 ChangeId::parse(&response.old_value)
244 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
245 )
246 },
247 error: (!response.error.is_empty()).then_some(response.error),
248 })
249 }
250
251 pub async fn push(
252 &mut self,
253 repo: &Repository,
254 repo_path: &str,
255 local_state: ChangeId,
256 target_thread: &str,
257 force: bool,
258 ) -> Result<PushComplete, ProtocolError> {
259 self.push_with_revision(
260 repo,
261 repo_path,
262 local_state,
263 target_thread,
264 force,
265 RevisionAddress::heddle(local_state).to_string(),
266 None,
267 &Progress::null(),
268 )
269 .await
270 }
271
272 pub async fn push_git_overlay_mirror(
287 &mut self,
288 repo: &Repository,
289 repo_path: &str,
290 local_state: ChangeId,
291 target_thread: &str,
292 force: bool,
293 progress: &Progress,
294 ) -> Result<PushComplete, ProtocolError> {
295 progress.set_phase("packing refs");
296 let remote_ref_expectations = self.git_mirror_ref_expectations(repo_path).await?;
297 let git_lane = build_git_mirror_push_plan(
298 repo,
299 self.transport.chunk_size.max(1),
300 &remote_ref_expectations,
301 )?;
302 let local_revision_address = git_lane.local_revision_address.clone();
303 self.push_with_revision(
304 repo,
305 repo_path,
306 local_state,
307 target_thread,
308 force,
309 local_revision_address,
310 Some(git_lane),
311 progress,
312 )
313 .await
314 }
315
316 async fn git_mirror_ref_expectations(
320 &mut self,
321 repo_path: &str,
322 ) -> Result<HashMap<String, GitRefRemoteExpectation>, ProtocolError> {
323 let remote_refs = self.list_refs_with_revision_addresses(repo_path).await?;
324 let mut expectations = HashMap::with_capacity(remote_refs.len());
325 for entry in remote_refs {
326 let expectation = parse_git_ref_expectation(&entry.revision_address)?;
327 expectations.insert(entry.name, expectation);
328 }
329 Ok(expectations)
330 }
331
332 #[allow(clippy::too_many_arguments)]
333 async fn push_with_revision(
334 &mut self,
335 repo: &Repository,
336 repo_path: &str,
337 local_state: ChangeId,
338 target_thread: &str,
339 force: bool,
340 local_revision_address: String,
341 git_lane: Option<GitLanePushPlan>,
342 progress: &Progress,
343 ) -> Result<PushComplete, ProtocolError> {
344 let _ = self.transport.chunk_size;
345 let _ = self.transport.resume_attempts;
346 let git_lane_intent = if git_lane.is_some() {
351 GitLaneTransferIntent::ExistingImplementation
352 } else {
353 GitLaneTransferIntent::HeddleObjectsOnly
354 };
355 let closure = wire::enumerate_state_closure_transfer_with_options(
356 repo.store(),
357 local_state,
358 wire::StateClosureOptions::default(),
359 PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD,
360 )?;
361 let object_plan =
362 RepositoryTransferPlan::from_planned_objects(closure.planned_objects, git_lane_intent);
363 let full_objects = closure.full_objects;
364 let object_count = full_objects
365 .as_ref()
366 .map_or(object_plan.stats.total_objects, std::vec::Vec::len);
367 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
368 let transport_mode = preferred_transport_mode(&self.transport, object_count);
369 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
370 let request_message = PushMessage {
371 body: Some(push_message::Body::Request(PushRequest {
372 repo_path: repo_path.to_string(),
373 local_state: local_state.to_string_full(),
374 target_thread: target_thread.to_string(),
375 create_thread: true,
376 force,
377 objects: full_objects.as_ref().map_or_else(
378 || {
379 object_plan
380 .partitions
381 .iter()
382 .map(to_proto_planned_object)
383 .collect()
384 },
385 |objects| objects.iter().map(to_proto_object_info).collect(),
386 ),
387 transfer: Some(self.transport.transfer_checkpoint_with_mode(
388 transfer_id.clone(),
389 transport_mode,
390 0,
391 0,
392 false,
393 )),
394 partial_fetch_status: partial_fetch_status_for_repo(repo),
395 allow_partial_fetch: true,
396 thread_metadata: thread_metadata
397 .map(|metadata| to_proto_thread_metadata(&metadata)),
398 local_revision_address,
399 client_operation_id: String::new(),
400 })),
401 };
402
403 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
404 tx.send(request_message).await.map_err(|_| {
405 ProtocolError::InvalidState("failed to initialize push stream".to_string())
406 })?;
407 let mut request = Request::new(ReceiverStream::new(rx));
408 self.apply_auth(&mut request)?;
409 let mut response = self
410 .inner
411 .push(request)
412 .await
413 .map_err(status_to_protocol_error)?
414 .into_inner();
415
416 let ready = match response.message().await.map_err(status_to_protocol_error)? {
417 Some(PushMessage {
418 body: Some(push_message::Body::Ready(ready)),
419 }) => ready,
420 _ => {
421 return Err(ProtocolError::InvalidState(
422 "expected PushReady from gRPC server".to_string(),
423 ));
424 }
425 };
426 let object_index = match full_objects {
427 Some(objects) => objects
428 .into_iter()
429 .map(|info| (descriptor_id_from_info(&info), info))
430 .collect::<HashMap<_, _>>(),
431 None => object_plan
432 .partitions
433 .iter()
434 .map(|object| {
435 (
436 descriptor_id_from_plan(object),
437 object_info_from_plan(object),
438 )
439 })
440 .collect::<HashMap<_, _>>(),
441 };
442
443 let ready_transport_mode = ready
444 .transfer
445 .as_ref()
446 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
447 .unwrap_or(transport_mode);
448 let wanted_infos = ready
449 .want_objects
450 .into_iter()
451 .map(|want| {
452 object_index
453 .get(&descriptor_id(&want))
454 .cloned()
455 .ok_or_else(|| {
456 ProtocolError::InvalidState("server requested unknown object".to_string())
457 })
458 })
459 .collect::<Result<Vec<_>, _>>()?;
460
461 let wanted_plan = RepositoryTransferPlan::from_object_infos(wanted_infos, git_lane_intent);
462
463 if !wanted_plan.partitions.packable_objects.is_empty() {
464 send_native_pack_streaming_messages(
465 &tx,
466 repo,
467 &wanted_plan.partitions.packable_objects,
468 &transfer_id,
469 self.transport.chunk_size.max(1),
470 &self.transport,
471 ready_transport_mode,
472 )
473 .await?;
474 }
475
476 for info in wanted_plan.partitions.sidecar_objects {
477 let message = sidecar_push_message(repo, info)?;
478 tx.send(message).await.map_err(|_| {
479 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
480 })?;
481 }
482
483 if let Some(git_lane) = git_lane {
484 send_git_pack_streaming_messages(
487 &tx,
488 &git_lane.pack,
489 self.transport.chunk_size.max(1),
490 progress,
491 )
492 .await?;
493 progress.set_phase(format!("writing {} refs", git_lane.ref_updates.len()));
494 for ref_update in git_lane.ref_updates {
495 tx.send(ref_update).await.map_err(|_| {
496 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
497 })?;
498 }
499 }
500 drop(tx);
501
502 let result = match response.message().await.map_err(status_to_protocol_error)? {
503 Some(PushMessage {
504 body: Some(push_message::Body::Complete(complete)),
505 }) => PushComplete {
506 success: complete.success,
507 new_state: if complete.new_state.is_empty() {
508 None
509 } else {
510 Some(
511 ChangeId::parse(&complete.new_state)
512 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
513 )
514 },
515 error: (!complete.error.is_empty()).then_some(complete.error),
516 transfer_id: complete
517 .transfer
518 .as_ref()
519 .map(|transfer| transfer.transfer_id.clone())
520 .unwrap_or_default(),
521 transport_mode: complete
522 .transfer
523 .as_ref()
524 .map(|transfer| transport_mode_name(transfer.transport_mode))
525 .unwrap_or("raw-objects")
526 .to_string(),
527 resume_offset: complete
528 .transfer
529 .as_ref()
530 .map(|transfer| transfer.resume_offset)
531 .unwrap_or_default(),
532 chunk_index: complete
533 .transfer
534 .as_ref()
535 .map(|transfer| transfer.chunk_index)
536 .unwrap_or_default(),
537 checkpoint: complete
538 .transfer
539 .as_ref()
540 .map(|transfer| transfer.checkpoint.clone())
541 .unwrap_or_default(),
542 is_complete: complete
543 .transfer
544 .as_ref()
545 .map(|transfer| transfer.is_complete)
546 .unwrap_or(false),
547 },
548 _ => {
549 return Err(ProtocolError::InvalidState(
550 "expected PushComplete from gRPC server".to_string(),
551 ));
552 }
553 };
554
555 if result.success {
556 self.sync_remote_markers(repo, repo_path, local_state)
557 .await?;
558 }
559 Ok(result)
560 }
561
562 pub async fn pull(
563 &mut self,
564 repo: &Repository,
565 repo_path: &str,
566 remote_thread: &str,
567 local_thread: Option<&str>,
568 ) -> Result<PullComplete, ProtocolError> {
569 self.pull_with_options(
570 repo,
571 repo_path,
572 remote_thread,
573 PullOptions {
574 local_thread,
575 depth: None,
576 target_state: None,
577 materialization: PullMaterialization::Full,
578 },
579 )
580 .await
581 }
582
583 pub async fn pull_profiled(
584 &mut self,
585 repo: &Repository,
586 repo_path: &str,
587 remote_thread: &str,
588 local_thread: Option<&str>,
589 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
590 self.pull_exchange(
591 repo,
592 repo_path,
593 remote_thread,
594 PullOptions {
595 local_thread,
596 depth: None,
597 target_state: None,
598 materialization: PullMaterialization::Full,
599 },
600 )
601 .await
602 .map(|exchange| (exchange.result, exchange.profile))
603 }
604
605 pub async fn pull_partial(
606 &mut self,
607 repo: &Repository,
608 repo_path: &str,
609 remote_thread: &str,
610 local_thread: Option<&str>,
611 ) -> Result<PullComplete, ProtocolError> {
612 self.pull_with_options(
613 repo,
614 repo_path,
615 remote_thread,
616 PullOptions {
617 local_thread,
618 depth: None,
619 target_state: None,
620 materialization: PullMaterialization::Lazy,
621 },
622 )
623 .await
624 }
625
626 pub async fn pull_with_depth_and_materialization(
627 &mut self,
628 repo: &Repository,
629 repo_path: &str,
630 remote_thread: &str,
631 local_thread: Option<&str>,
632 depth: Option<u32>,
633 materialization: PullMaterialization,
634 ) -> Result<PullComplete, ProtocolError> {
635 self.pull_with_options(
636 repo,
637 repo_path,
638 remote_thread,
639 PullOptions {
640 local_thread,
641 depth,
642 target_state: None,
643 materialization,
644 },
645 )
646 .await
647 }
648
649 pub async fn pull_with_depth(
650 &mut self,
651 repo: &Repository,
652 repo_path: &str,
653 remote_thread: &str,
654 local_thread: Option<&str>,
655 depth: Option<u32>,
656 ) -> Result<PullComplete, ProtocolError> {
657 self.pull_with_depth_and_materialization(
658 repo,
659 repo_path,
660 remote_thread,
661 local_thread,
662 depth,
663 PullMaterialization::Full,
664 )
665 .await
666 }
667
668 pub async fn fetch_state(
669 &mut self,
670 repo: &Repository,
671 repo_path: &str,
672 remote_thread: &str,
673 target_state: ChangeId,
674 ) -> Result<usize, ProtocolError> {
675 self.pull_exchange(
676 repo,
677 repo_path,
678 remote_thread,
679 PullOptions {
680 local_thread: None,
681 depth: None,
682 target_state: Some(target_state),
683 materialization: PullMaterialization::Full,
684 },
685 )
686 .await
687 .map(|exchange| exchange.object_count)
688 }
689
690 pub async fn fetch_state_partial(
691 &mut self,
692 repo: &Repository,
693 repo_path: &str,
694 remote_thread: &str,
695 target_state: ChangeId,
696 ) -> Result<usize, ProtocolError> {
697 self.pull_exchange(
698 repo,
699 repo_path,
700 remote_thread,
701 PullOptions {
702 local_thread: None,
703 depth: None,
704 target_state: Some(target_state),
705 materialization: PullMaterialization::Lazy,
706 },
707 )
708 .await
709 .map(|exchange| exchange.object_count)
710 }
711
712 pub async fn hydrate_blob_at_path(
713 &mut self,
714 repo: &Repository,
715 repo_path: &str,
716 reference: &str,
717 path: &str,
718 ) -> Result<objects::object::Blob, ProtocolError> {
719 let mut request = Request::new(GetBlobRequest {
720 repo_path: repo_path.to_string(),
721 r#ref: reference.to_string(),
722 path: path.to_string(),
723 });
724 self.apply_signed_auth(&mut request, "/heddle.v1.ContentService/GetBlob")?;
725 let response = self
726 .content
727 .get_blob(request)
728 .await
729 .map_err(status_to_protocol_error)?
730 .into_inner();
731
732 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
733 let blob = objects::object::Blob::new(content);
734 repo.store().put_blob(&blob)?;
735 repo.clear_missing_blob(&blob.hash())?;
736 Ok(blob)
737 }
738
739 pub async fn hydrate_missing_blobs_for_state(
740 &mut self,
741 repo: &Repository,
742 repo_path: &str,
743 remote_thread: &str,
744 target_state: ChangeId,
745 ) -> Result<usize, ProtocolError> {
746 let exchange = self
747 .pull_exchange(
748 repo,
749 repo_path,
750 remote_thread,
751 PullOptions {
752 local_thread: None,
753 depth: None,
754 target_state: Some(target_state),
755 materialization: PullMaterialization::Full,
756 },
757 )
758 .await?;
759 clear_missing_blobs_for_state(repo, target_state)?;
760 Ok(exchange.object_count)
761 }
762
763 async fn pull_with_options(
764 &mut self,
765 repo: &Repository,
766 repo_path: &str,
767 remote_thread: &str,
768 options: PullOptions<'_>,
769 ) -> Result<PullComplete, ProtocolError> {
770 self.pull_exchange(repo, repo_path, remote_thread, options)
771 .await
772 .map(|exchange| exchange.result)
773 }
774
775 async fn pull_exchange(
776 &mut self,
777 repo: &Repository,
778 repo_path: &str,
779 remote_thread: &str,
780 options: PullOptions<'_>,
781 ) -> Result<PullExchange, ProtocolError> {
782 let exchange_start = Instant::now();
783 let mut exclude_states = Vec::new();
784 let advertised_head = if let Some(local_thread) = options.local_thread {
795 locally_complete_local_thread_head(repo, local_thread, options.target_state)?
796 } else {
797 locally_complete_pull_head(repo, remote_thread, options.target_state)?
798 };
799 if let Some(head) = advertised_head {
800 exclude_states.push(head);
801 }
802 let allow_partial_fetch = options.materialization.allows_partial_fetch();
803 let fresh_full_pull =
804 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
805
806 let transfer_id = pull_transfer_id(
807 repo_path,
808 remote_thread,
809 options.local_thread,
810 options.depth,
811 options.target_state,
812 );
813 let request_message = PullMessage {
814 body: Some(pull_message::Body::Request(PullRequest {
815 repo_path: repo_path.to_string(),
816 remote_thread: remote_thread.to_string(),
817 local_thread: options.local_thread.unwrap_or_default().to_string(),
818 target_state: options
819 .target_state
820 .map(|value| value.to_string_full())
821 .unwrap_or_default(),
822 depth: options.depth.unwrap_or_default(),
823 exclude_states: exclude_states
824 .iter()
825 .map(ChangeId::to_string_full)
826 .collect(),
827 transfer: Some(self.transport.transfer_checkpoint_with_mode(
828 transfer_id.clone(),
829 TransportMode::NativePack,
830 0,
831 0,
832 false,
833 )),
834 partial_fetch_status: partial_fetch_status_for_repo(repo),
835 allow_partial_fetch,
836 fresh_full_pull,
837 target_revision_address: options
838 .target_state
839 .map(|state| RevisionAddress::heddle(state).to_string())
840 .unwrap_or_default(),
841 client_operation_id: String::new(),
842 })),
843 };
844
845 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
846 tx.send(request_message).await.map_err(|_| {
847 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
848 })?;
849 let mut request = Request::new(ReceiverStream::new(rx));
850 self.apply_auth(&mut request)?;
851 let mut response = self
852 .inner
853 .pull(request)
854 .await
855 .map_err(status_to_protocol_error)?
856 .into_inner();
857
858 let ready = match response.message().await.map_err(status_to_protocol_error)? {
859 Some(PullMessage {
860 body: Some(pull_message::Body::Ready(ready)),
861 }) => ready,
862 _ => {
863 return Err(ProtocolError::InvalidState(
864 "expected PullReady from gRPC server".to_string(),
865 ));
866 }
867 };
868 let mut profile = PullProfile {
869 ready_wait: exchange_start.elapsed(),
870 ..PullProfile::default()
871 };
872 let remote_state = ChangeId::parse(&ready.remote_state)
873 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
874 let advertised_object_count = ready.objects_to_fetch.len();
875 let PullWantPlan {
876 wants,
877 transfer_plan,
878 wanted_types,
879 want_full_closure,
880 } = plan_pull_wants(
881 repo,
882 &remote_state,
883 ready.full_closure_available,
884 ready.objects_to_fetch,
885 allow_partial_fetch,
886 )?;
887 let native_pack_required = native_pack_required_for_pull(want_full_closure, &transfer_plan);
888
889 tx.send(PullMessage {
890 body: Some(pull_message::Body::Want(WantObjects {
891 objects: wants.clone(),
892 want_full_closure,
893 transfer: Some(self.transport.transfer_checkpoint_with_mode(
894 transfer_id.clone(),
895 TransportMode::NativePack,
896 0,
897 0,
898 false,
899 )),
900 })),
901 })
902 .await
903 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
904 drop(tx);
905
906 let receive_start = Instant::now();
907 let use_pack_spool = advertised_object_count > PULL_PACK_SPOOL_OBJECT_THRESHOLD;
908 let mut pack_state = wire::PackChunkState::default();
909 let mut pack_spool = if use_pack_spool {
910 Some(wire::PackChunkSpool::new_in(repo.heddle_dir())?)
911 } else {
912 None
913 };
914 let mut git_lane_repo = None;
915 let mut git_pack_state = GitPackPullInstallState::default();
916 let mut received = 0usize;
917 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
918 match message.body {
919 Some(pull_message::Body::Pack(chunk)) => {
920 profile.bytes_received =
921 profile.bytes_received.saturating_add(chunk.data.len());
922 profile.pack_bytes_received =
923 profile.pack_bytes_received.saturating_add(chunk.data.len());
924 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
925 ProtocolError::InvalidState(
926 "native pack chunk missing transfer checkpoint".to_string(),
927 )
928 })?;
929 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
930 .unwrap_or(PackStreamKind::Unspecified);
931 if stream_kind == PackStreamKind::Unspecified {
932 return Err(ProtocolError::InvalidState(
933 "native pack chunk missing stream kind".to_string(),
934 ));
935 }
936 let decode_start = Instant::now();
937 if let Some(pack_spool) = pack_spool.as_mut() {
938 pack_spool.receive_chunk(
939 stream_kind == PackStreamKind::Index,
940 transfer.resume_offset,
941 transfer.chunk_index,
942 transfer.is_complete,
943 &chunk.data,
944 chunk.is_final_chunk,
945 )?;
946 } else {
947 wire::receive_pack_chunk(
948 &mut pack_state,
949 stream_kind == PackStreamKind::Index,
950 transfer.resume_offset,
951 transfer.chunk_index,
952 transfer.is_complete,
953 &chunk.data,
954 chunk.is_final_chunk,
955 )?;
956 }
957 let decode_elapsed = decode_start.elapsed();
958 profile.pack_decode += decode_elapsed;
959 profile.pack_decode_apply += decode_elapsed;
960 profile.decode += decode_elapsed;
961 }
962 Some(pull_message::Body::Redaction(transfer)) => {
963 wire::check_received_transfer_blob_size(
969 transfer.redactions_blob.len(),
970 wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
971 "redactions",
972 )?;
973 profile.bytes_received = profile
974 .bytes_received
975 .saturating_add(transfer.redactions_blob.len());
976 profile.object_mix.record(ObjectType::Redaction);
977 let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
978 ProtocolError::InvalidState(format!(
979 "RedactionTransfer.blob_hash is not a valid content hash: {err}"
980 ))
981 })?;
982 let decode_start = Instant::now();
983 repo.accept_wire_redactions(blob, &transfer.redactions_blob)
984 .map_err(|err| {
985 ProtocolError::InvalidState(format!(
986 "accept_wire_redactions for blob {}: {err}",
987 transfer.blob_hash
988 ))
989 })?;
990 let decode_elapsed = decode_start.elapsed();
991 profile.store_receive_object += decode_elapsed;
992 }
993 Some(pull_message::Body::StateVisibility(transfer)) => {
994 wire::check_received_transfer_blob_size(
995 transfer.state_visibility_blob.len(),
996 wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
997 "state-visibility",
998 )?;
999 profile.bytes_received = profile
1000 .bytes_received
1001 .saturating_add(transfer.state_visibility_blob.len());
1002 profile.object_mix.record(ObjectType::StateVisibility);
1003 let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
1004 ProtocolError::InvalidState(format!(
1005 "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
1006 ))
1007 })?;
1008 let decode_start = Instant::now();
1009 repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
1010 .map_err(|err| {
1011 ProtocolError::InvalidState(format!(
1012 "accept_wire_state_visibility for state {}: {err}",
1013 transfer.state_id
1014 ))
1015 })?;
1016 let decode_elapsed = decode_start.elapsed();
1017 profile.store_receive_object += decode_elapsed;
1018 }
1019 Some(pull_message::Body::GitLane(transfer)) => {
1020 let decode_start = Instant::now();
1021 profile.bytes_received = profile
1022 .bytes_received
1023 .saturating_add(git_lane_transfer_size(&transfer));
1024 accept_git_lane_pull_transfer(
1025 repo,
1026 &mut git_lane_repo,
1027 &mut git_pack_state,
1028 transfer,
1029 )?;
1030 let decode_elapsed = decode_start.elapsed();
1031 profile.store_receive_object += decode_elapsed;
1032 }
1033 Some(pull_message::Body::Complete(complete)) => {
1034 profile.receive_and_apply = receive_start.elapsed();
1035 git_pack_state.ensure_idle()?;
1036 let final_state = if complete.new_state.is_empty() {
1037 None
1038 } else {
1039 Some(
1040 ChangeId::parse(&complete.new_state)
1041 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
1042 )
1043 };
1044
1045 if complete.success {
1046 if native_pack_required {
1047 let store_start = Instant::now();
1048 let installed_ids = if let Some(pack_spool) = pack_spool.as_mut() {
1049 if !pack_spool.is_complete() {
1050 return Err(ProtocolError::InvalidState(
1051 "pull completed before native pack stream finished"
1052 .to_string(),
1053 ));
1054 }
1055 pack_spool.install_into(repo.store())?
1056 } else {
1057 if !pack_state.is_complete() {
1058 return Err(ProtocolError::InvalidState(
1059 "pull completed before native pack stream finished"
1060 .to_string(),
1061 ));
1062 }
1063 wire::install_received_pack(
1064 repo.store(),
1065 &pack_state.pack_data,
1066 &pack_state.index_data,
1067 )?
1068 };
1069 profile.store_receive_object += store_start.elapsed();
1070 received = installed_ids.len();
1071 for id in installed_ids {
1072 match (id, wanted_packable_type(&wanted_types, &id)) {
1073 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
1074 profile.object_mix.record(ObjectType::Blob);
1075 repo.clear_missing_blob(&hash)?;
1076 }
1077 (_, Some(obj_type)) => {
1078 profile.object_mix.record(obj_type);
1079 }
1080 (PackObjectId::ChangeId(_), None) => {
1081 profile.object_mix.record(ObjectType::State);
1082 }
1083 (PackObjectId::Hash(hash), None) => {
1084 let inferred =
1085 infer_installed_hash_object_type(repo, &hash)?;
1086 profile.object_mix.record(inferred);
1087 }
1088 }
1089 }
1090 }
1091
1092 let metadata_start = Instant::now();
1093 if let Some(local_thread) = options.local_thread
1094 && let Some(state) = final_state
1095 {
1096 repo.refs()
1097 .set_thread(&ThreadName::from(local_thread), &state)?;
1098 }
1099 if let Some(state) = final_state
1100 && allow_partial_fetch
1101 {
1102 mark_missing_blobs_for_state(repo, state)?;
1103 } else if final_state.is_some() {
1104 let _ = repo.clear_all_missing_blobs()?;
1105 }
1106 let synced_markers = complete
1107 .transfer
1108 .as_ref()
1109 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
1110 .transpose()?
1111 .unwrap_or(false);
1112 if !synced_markers {
1113 self.sync_local_markers(repo, repo_path).await?;
1114 }
1115 profile.metadata_sync = metadata_start.elapsed();
1116 profile.objects_received = received;
1117 return Ok(PullExchange {
1118 result: PullComplete {
1119 success: true,
1120 final_state,
1121 error: None,
1122 transfer_id: complete
1123 .transfer
1124 .as_ref()
1125 .map(|transfer| transfer.transfer_id.clone())
1126 .unwrap_or_default(),
1127 transport_mode: complete
1128 .transfer
1129 .as_ref()
1130 .map(|transfer| transport_mode_name(transfer.transport_mode))
1131 .unwrap_or("native-pack")
1132 .to_string(),
1133 resume_offset: complete
1134 .transfer
1135 .as_ref()
1136 .map(|transfer| transfer.resume_offset)
1137 .unwrap_or_default(),
1138 chunk_index: complete
1139 .transfer
1140 .as_ref()
1141 .map(|transfer| transfer.chunk_index)
1142 .unwrap_or_default(),
1143 checkpoint: complete
1144 .transfer
1145 .as_ref()
1146 .map(|transfer| transfer.checkpoint.clone())
1147 .unwrap_or_default(),
1148 is_complete: complete
1149 .transfer
1150 .as_ref()
1151 .map(|transfer| transfer.is_complete)
1152 .unwrap_or(false),
1153 },
1154 object_count: received,
1155 profile,
1156 });
1157 }
1158
1159 profile.objects_received = received;
1160 return Ok(PullExchange {
1161 result: PullComplete {
1162 success: false,
1163 final_state,
1164 error: (!complete.error.is_empty()).then_some(complete.error),
1165 transfer_id: complete
1166 .transfer
1167 .as_ref()
1168 .map(|transfer| transfer.transfer_id.clone())
1169 .unwrap_or_default(),
1170 transport_mode: complete
1171 .transfer
1172 .as_ref()
1173 .map(|transfer| transport_mode_name(transfer.transport_mode))
1174 .unwrap_or("native-pack")
1175 .to_string(),
1176 resume_offset: complete
1177 .transfer
1178 .as_ref()
1179 .map(|transfer| transfer.resume_offset)
1180 .unwrap_or_default(),
1181 chunk_index: complete
1182 .transfer
1183 .as_ref()
1184 .map(|transfer| transfer.chunk_index)
1185 .unwrap_or_default(),
1186 checkpoint: complete
1187 .transfer
1188 .as_ref()
1189 .map(|transfer| transfer.checkpoint.clone())
1190 .unwrap_or_default(),
1191 is_complete: complete
1192 .transfer
1193 .as_ref()
1194 .map(|transfer| transfer.is_complete)
1195 .unwrap_or(false),
1196 },
1197 object_count: received,
1198 profile,
1199 });
1200 }
1201 _ => {}
1202 }
1203 }
1204
1205 Err(ProtocolError::InvalidState(format!(
1206 "pull stream ended unexpectedly after receiving {received} packed objects"
1207 )))
1208 }
1209}
1210
1211fn redaction_push_message(
1212 repo: &Repository,
1213 info: wire::ObjectInfo,
1214) -> Result<PushMessage, ProtocolError> {
1215 let wire::ObjectId::Hash(blob) = info.id else {
1216 return Err(ProtocolError::InvalidState(
1217 "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
1218 ));
1219 };
1220 let hex = blob.to_hex();
1221 let bytes = repo
1226 .store()
1227 .get_redactions_bytes_for_blob(&blob)
1228 .map_err(|err| {
1229 ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
1230 })?
1231 .ok_or_else(|| {
1232 ProtocolError::InvalidState(format!(
1233 "server wants redaction for blob {} but sender has no sidecar",
1234 hex
1235 ))
1236 })?;
1237 Ok(PushMessage {
1238 body: Some(push_message::Body::Redaction(RedactionTransfer {
1239 blob_hash: hex,
1240 redactions_blob: bytes.into(),
1241 })),
1242 })
1243}
1244
1245fn native_pack_required_for_pull(
1246 want_full_closure: bool,
1247 transfer_plan: &RepositoryTransferPlan<ObjectInfo>,
1248) -> bool {
1249 transfer_plan.requires_native_pack(want_full_closure)
1250}
1251
1252fn object_info_from_plan(object: &PlannedObject) -> ObjectInfo {
1253 ObjectInfo {
1254 id: object.id.clone(),
1255 obj_type: object.obj_type,
1256 size: 0,
1257 delta_base: None,
1258 }
1259}
1260
1261fn to_proto_planned_object(object: &PlannedObject) -> ObjectDescriptor {
1262 object_descriptor_with_status(
1263 &object_info_from_plan(object),
1264 ObjectAvailabilityStatus::Present,
1265 "",
1266 )
1267}
1268
1269fn descriptor_id_from_plan(object: &PlannedObject) -> (String, String) {
1270 let id = match &object.id {
1271 wire::ObjectId::Hash(hash) => hash.to_hex(),
1272 wire::ObjectId::ChangeId(change_id) => change_id.to_string_full(),
1273 };
1274 (id, object_type_name(object.obj_type).to_string())
1275}
1276
1277fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
1278 let types = wanted_types.entry(pack_id).or_default();
1279 if !types.contains(&obj_type) {
1280 types.push(obj_type);
1281 }
1282}
1283
1284fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
1285 wanted_types.get(pack_id).and_then(|types| {
1286 types
1287 .iter()
1288 .copied()
1289 .find(|obj_type| obj_type.packable())
1290 })
1291}
1292
1293fn sidecar_push_message(
1294 repo: &Repository,
1295 info: wire::ObjectInfo,
1296) -> Result<PushMessage, ProtocolError> {
1297 match info.obj_type {
1298 ObjectType::Redaction => redaction_push_message(repo, info),
1299 ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1300 obj_type => Err(ProtocolError::InvalidState(format!(
1301 "{obj_type:?} is not an out-of-pack sidecar object"
1302 ))),
1303 }
1304}
1305
1306fn state_visibility_push_message(
1307 repo: &Repository,
1308 info: wire::ObjectInfo,
1309) -> Result<PushMessage, ProtocolError> {
1310 let wire::ObjectId::ChangeId(state) = info.id else {
1311 return Err(ProtocolError::InvalidState(
1312 "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1313 ));
1314 };
1315 let state_id = state.to_string_full();
1316 let bytes = repo
1317 .get_state_visibility_bytes_for_state(&state)
1318 .map_err(|err| {
1319 ProtocolError::InvalidState(format!(
1320 "load state-visibility sidecar for {}: {err}",
1321 state_id
1322 ))
1323 })?
1324 .ok_or_else(|| {
1325 ProtocolError::InvalidState(format!(
1326 "server wants state visibility for state {} but sender has no sidecar",
1327 state_id
1328 ))
1329 })?;
1330 Ok(PushMessage {
1331 body: Some(push_message::Body::StateVisibility(
1332 StateVisibilityTransfer {
1333 state_id,
1334 state_visibility_blob: bytes.into(),
1335 },
1336 )),
1337 })
1338}
1339
1340fn load_thread_metadata(
1341 repo: &Repository,
1342 target_thread: &str,
1343 local_state: ChangeId,
1344) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1345 let thread_manager = ThreadManager::new(repo.heddle_dir());
1346 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1347}
1348
1349fn plan_pull_wants(
1350 repo: &Repository,
1351 remote_state: &ChangeId,
1352 full_closure_available: bool,
1353 objects_to_fetch: Vec<ObjectDescriptor>,
1354 allow_partial_fetch: bool,
1355) -> Result<PullWantPlan, ProtocolError> {
1356 if full_closure_available {
1357 return Ok(PullWantPlan {
1358 wants: Vec::new(),
1359 transfer_plan: RepositoryTransferPlan::from_object_infos(
1360 Vec::<ObjectInfo>::new(),
1361 GitLaneTransferIntent::HeddleObjectsOnly,
1362 ),
1363 wanted_types: HashMap::new(),
1364 want_full_closure: true,
1365 });
1366 }
1367 let request_full_closure =
1368 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1369 let mut wants = Vec::with_capacity(objects_to_fetch.len());
1370 let mut wanted_infos = Vec::with_capacity(objects_to_fetch.len());
1371 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1372
1373 for descriptor in objects_to_fetch {
1374 let info = parse_descriptor_to_info(descriptor)?;
1375 let pack_id = match &info.id {
1376 wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1377 wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1378 };
1379 let include = if request_full_closure {
1380 true
1381 } else {
1382 let has = wire::has_object(repo.store(), &info)?;
1383 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1384 };
1385
1386 if include {
1387 record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1388 wants.push(object_descriptor_with_status(
1389 &info,
1390 ObjectAvailabilityStatus::Missing,
1391 "requested by client",
1392 ));
1393 wanted_infos.push(info);
1394 }
1395 }
1396
1397 Ok(PullWantPlan {
1398 wants,
1399 transfer_plan: RepositoryTransferPlan::from_object_infos(
1400 wanted_infos,
1401 GitLaneTransferIntent::HeddleObjectsOnly,
1402 ),
1403 wanted_types,
1404 want_full_closure: false,
1405 })
1406}
1407
1408fn supports_compact_full_pull(
1409 repo: &Repository,
1410 allow_partial_fetch: bool,
1411 exclude_states: &[ChangeId],
1412) -> Result<bool, ProtocolError> {
1413 if allow_partial_fetch || !exclude_states.is_empty() {
1414 return Ok(false);
1415 }
1416 repo_looks_fresh(repo)
1417}
1418
1419fn locally_complete_pull_head(
1442 repo: &Repository,
1443 remote_thread: &str,
1444 target_state: Option<ChangeId>,
1445) -> Result<Option<ChangeId>, ProtocolError> {
1446 locally_complete_thread_head(repo, remote_thread, target_state)
1447}
1448
1449fn locally_complete_local_thread_head(
1458 repo: &Repository,
1459 local_thread: &str,
1460 target_state: Option<ChangeId>,
1461) -> Result<Option<ChangeId>, ProtocolError> {
1462 locally_complete_thread_head(repo, local_thread, target_state)
1463}
1464
1465fn locally_complete_thread_head(
1485 repo: &Repository,
1486 thread: &str,
1487 target_state: Option<ChangeId>,
1488) -> Result<Option<ChangeId>, ProtocolError> {
1489 if target_state.is_some() {
1492 return Ok(None);
1493 }
1494 if !repo.missing_blobs()?.is_empty() {
1497 return Ok(None);
1498 }
1499 let Some(head) = repo.refs().get_thread(&ThreadName::from(thread))? else {
1500 return Ok(None);
1503 };
1504 match wire::enumerate_state_closure(repo.store(), head) {
1509 Ok(_) => Ok(Some(head)),
1510 Err(ProtocolError::ObjectNotFound(_)) => Ok(None),
1511 Err(err) => Err(err),
1512 }
1513}
1514
1515fn should_request_full_closure(
1516 repo: &Repository,
1517 remote_state: &ChangeId,
1518 allow_partial_fetch: bool,
1519) -> Result<bool, ProtocolError> {
1520 if allow_partial_fetch || repo.store().has_state(remote_state)? {
1521 return Ok(false);
1522 }
1523 repo_looks_fresh(repo)
1524}
1525
1526fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1527 if repo.head()?.is_some() {
1528 return Ok(false);
1529 }
1530 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1531 return Ok(false);
1532 }
1533 Ok(repo.missing_blobs()?.is_empty())
1534}
1535
1536fn infer_installed_hash_object_type(
1537 repo: &Repository,
1538 hash: &ContentHash,
1539) -> Result<ObjectType, ProtocolError> {
1540 let store = repo.store();
1541 if store.get_tree(hash)?.is_some() {
1542 return Ok(ObjectType::Tree);
1543 }
1544 if store
1545 .get_action(&objects::object::ActionId::from_hash(*hash))?
1546 .is_some()
1547 {
1548 return Ok(ObjectType::Action);
1549 }
1550 Ok(ObjectType::Blob)
1551}
1552
1553fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1554 const HEADER: &str = "heddle-markers-v1\n";
1555 if checkpoint.is_empty() {
1556 return Ok(false);
1557 }
1558 let payload = std::str::from_utf8(checkpoint)
1559 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1560 let Some(lines) = payload.strip_prefix(HEADER) else {
1561 return Ok(false);
1562 };
1563
1564 for line in lines.lines() {
1565 if line.is_empty() {
1566 continue;
1567 }
1568 let Some((name, change_id)) = line.split_once('\t') else {
1569 return Err(ProtocolError::InvalidState(
1570 "invalid marker snapshot line".to_string(),
1571 ));
1572 };
1573 let change_id = ChangeId::parse(change_id)
1574 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1575 if !repo.store().has_state(&change_id)? {
1576 continue;
1577 }
1578 let name = MarkerName::from(name);
1579 match repo.refs().get_marker(&name)? {
1580 Some(existing) if existing == change_id => {}
1581 Some(existing) => repo.refs().set_marker_cas(
1582 &name,
1583 refs::RefExpectation::Value(existing),
1584 &change_id,
1585 )?,
1586 None => repo.refs().create_marker(&name, &change_id)?,
1587 }
1588 }
1589
1590 Ok(true)
1591}
1592
1593fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1594 if s.is_empty() {
1595 return Vec::new();
1596 }
1597 objects::object::ChangeId::parse(s)
1598 .map(|id| id.as_bytes().to_vec())
1599 .unwrap_or_default()
1600}
1601
1602fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1603 ThreadMetadata {
1604 name: metadata.thread.clone(),
1605 target_thread: metadata.target_thread.clone(),
1606 parent_thread: metadata.parent_thread.clone(),
1607 task: metadata.task.clone(),
1608 thread_mode: metadata.mode.to_string(),
1609 thread_state: metadata.state.to_string(),
1610 freshness: metadata.freshness.to_string(),
1611 base_state: change_id_string_to_bytes(&metadata.base_state),
1612 base_root: change_id_string_to_bytes(&metadata.base_root),
1613 current_state: metadata
1614 .current_state
1615 .as_deref()
1616 .map(change_id_string_to_bytes),
1617 merged_state: metadata
1618 .merged_state
1619 .as_deref()
1620 .map(change_id_string_to_bytes),
1621 changed_paths: metadata.changed_paths.clone(),
1622 impact_categories: metadata
1623 .impact_categories
1624 .iter()
1625 .map(ToString::to_string)
1626 .collect(),
1627 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1628 promotion_suggested: metadata.promotion_suggested,
1629 verification_summary: Some(ThreadVerificationSummary {
1630 tests_passed: metadata.verification_summary.tests_passed,
1631 tests_failed: metadata
1632 .verification_summary
1633 .tests_failed
1634 .unwrap_or_default(),
1635 coverage_pct: metadata.verification_summary.coverage_pct,
1636 lint_warnings: metadata.verification_summary.lint_warnings,
1637 }),
1638 confidence_summary: Some(ThreadConfidenceSummary {
1639 value: metadata.confidence_summary.value,
1640 band: metadata
1641 .confidence_summary
1642 .band
1643 .as_ref()
1644 .map(ToString::to_string),
1645 }),
1646 integration_policy_result: Some(ThreadIntegrationPolicy {
1647 status: metadata
1648 .integration_policy_result
1649 .status
1650 .clone()
1651 .unwrap_or_default(),
1652 reason: metadata
1653 .integration_policy_result
1654 .reason
1655 .clone()
1656 .unwrap_or_default(),
1657 }),
1658 created_at: Some(prost_types::Timestamp {
1659 seconds: metadata.created_at.timestamp(),
1660 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1661 }),
1662 updated_at: Some(prost_types::Timestamp {
1663 seconds: metadata.updated_at.timestamp(),
1664 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1665 }),
1666 }
1667}
1668
1669struct PullExchange {
1670 result: PullComplete,
1671 object_count: usize,
1672 profile: PullProfile,
1673}
1674
1675fn mark_missing_blobs_for_state(
1676 repo: &Repository,
1677 state_id: ChangeId,
1678) -> Result<(), ProtocolError> {
1679 let state = repo
1680 .store()
1681 .get_state(&state_id)?
1682 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1683 let mut missing = wire::missing_blobs_in_tree(repo.store(), state.tree)?;
1684 if let Some(context_root) = state.context.as_ref() {
1685 missing.extend(wire::missing_blobs_in_tree(repo.store(), *context_root)?);
1686 }
1687 if let Some(discussions_blob) = state.discussions.as_ref()
1688 && !repo.store().has_blob(discussions_blob)?
1689 {
1690 missing.push(*discussions_blob);
1691 }
1692 missing
1693 .into_iter()
1694 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1695}
1696
1697fn clear_missing_blobs_for_state(
1698 repo: &Repository,
1699 state_id: ChangeId,
1700) -> Result<(), ProtocolError> {
1701 let state = repo
1702 .store()
1703 .get_state(&state_id)?
1704 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1705 let mut missing = wire::missing_blobs_in_tree(repo.store(), state.tree)?;
1706 if let Some(context_root) = state.context.as_ref() {
1707 missing.extend(wire::missing_blobs_in_tree(repo.store(), *context_root)?);
1708 }
1709 if let Some(discussions_blob) = state.discussions.as_ref() {
1710 missing.push(*discussions_blob);
1711 }
1712 missing
1713 .into_iter()
1714 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1715}
1716
1717fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1718 match repo.missing_blobs() {
1719 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1720 Ok(_) => PartialFetchStatus::Disabled as i32,
1721 Err(_) => PartialFetchStatus::Unspecified as i32,
1722 }
1723}
1724
1725fn pull_transfer_id(
1726 repo_path: &str,
1727 remote_thread: &str,
1728 local_thread: Option<&str>,
1729 depth: Option<u32>,
1730 target_state: Option<ChangeId>,
1731) -> String {
1732 format!(
1733 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1734 local_thread.unwrap_or_default(),
1735 target_state
1736 .map(|value| value.to_string_full())
1737 .unwrap_or_default()
1738 )
1739}
1740
1741fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1742 format!(
1743 "push:{repo_path}:{}:{target_thread}",
1744 local_state.to_string_full()
1745 )
1746}
1747
1748fn build_git_mirror_push_plan(
1762 repo: &Repository,
1763 chunk_size: usize,
1764 remote_ref_expectations: &HashMap<String, GitRefRemoteExpectation>,
1765) -> Result<GitLanePushPlan, ProtocolError> {
1766 if repo.capability() != RepositoryCapability::GitOverlay {
1767 return Err(ProtocolError::InvalidState(
1768 "Git mirror pushes require a git-overlay repository".to_string(),
1769 ));
1770 }
1771 let git_repo = repo
1772 .git_overlay_sley_repository()
1773 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1774 .ok_or_else(|| {
1775 ProtocolError::InvalidState("git-overlay repository has no Git store".to_string())
1776 })?;
1777 build_git_mirror_plan_from_sley(&git_repo, chunk_size, remote_ref_expectations)
1778}
1779
1780fn build_git_mirror_plan_from_sley(
1783 git_repo: &SleyRepository,
1784 chunk_size: usize,
1785 remote_ref_expectations: &HashMap<String, GitRefRemoteExpectation>,
1786) -> Result<GitLanePushPlan, ProtocolError> {
1787 let refs = git_repo
1788 .references()
1789 .list_refs()
1790 .map_err(|err| ProtocolError::InvalidState(format!("list git-overlay refs: {err}")))?;
1791
1792 let mut roots: Vec<GitObjectId> = Vec::new();
1793 let mut ref_updates: Vec<PushMessage> = Vec::new();
1794 let mut newest_root: Option<GitObjectId> = None;
1795
1796 for reference in refs {
1797 if GitRefName::new(&reference.name).is_local_only() {
1811 continue;
1812 }
1813
1814 let target_oid = match reference.target {
1819 ReferenceTarget::Direct(oid) => oid,
1820 ReferenceTarget::Symbolic(_) => continue,
1821 };
1822
1823 let object = git_repo.read_object(&target_oid).map_err(|err| {
1827 ProtocolError::InvalidState(format!(
1828 "git-overlay ref {} target {} is not readable: {err}",
1829 reference.name,
1830 target_oid.to_hex()
1831 ))
1832 })?;
1833
1834 let peeled_oid = if object.object_type == sley::GitObjectType::Tag {
1835 let peeled = git_repo.peel_to_object_oid(target_oid).map_err(|err| {
1836 ProtocolError::InvalidState(format!(
1837 "peel git-overlay tag ref {}: {err}",
1838 reference.name
1839 ))
1840 })?;
1841 Some(peeled)
1842 } else {
1843 None
1844 };
1845
1846 roots.push(target_oid);
1850 newest_root.get_or_insert(target_oid);
1851
1852 let expectation = remote_ref_expectations
1853 .get(&reference.name)
1854 .cloned()
1855 .unwrap_or(GitRefRemoteExpectation::Missing);
1856
1857 let kind = grpc_git_ref_kind(GitRefName::new(&reference.name).wire_kind());
1858 let mut message =
1859 git_ref_update_message(&reference.name, kind, target_oid, peeled_oid, None);
1860 apply_git_ref_expectation_value(&mut message, &expectation)?;
1861 ref_updates.push(message);
1862 }
1863
1864 if roots.is_empty() {
1865 return Err(ProtocolError::InvalidState(
1866 "git-overlay repository has no direct refs to mirror".to_string(),
1867 ));
1868 }
1869
1870 let pack = build_git_lane_multi_root_pack_plan(git_repo, roots, chunk_size)?;
1871
1872 let local_revision_address = newest_root
1875 .map(|oid| RevisionAddress::git_commit(oid.to_hex()).to_string())
1876 .unwrap_or_default();
1877
1878 Ok(GitLanePushPlan {
1879 local_revision_address,
1880 pack,
1881 ref_updates,
1882 })
1883}
1884
1885fn grpc_git_ref_kind(kind: ClassifiedGitRefKind) -> GrpcGitRefKind {
1886 match kind {
1887 ClassifiedGitRefKind::Branch => GrpcGitRefKind::Branch,
1888 ClassifiedGitRefKind::Tag => GrpcGitRefKind::Tag,
1889 ClassifiedGitRefKind::Note => GrpcGitRefKind::Note,
1890 ClassifiedGitRefKind::Other => GrpcGitRefKind::Other,
1891 }
1892}
1893
1894fn build_git_lane_multi_root_pack_plan(
1896 git_repo: &SleyRepository,
1897 roots: Vec<GitObjectId>,
1898 chunk_size: usize,
1899) -> Result<GitPackPushPlan, ProtocolError> {
1900 if roots.is_empty() {
1901 return Err(ProtocolError::InvalidState(
1902 "cannot plan a Git pack with no roots".to_string(),
1903 ));
1904 }
1905 let plan = git_repo
1906 .reachable_pack_plan()
1907 .roots(roots.iter().copied())
1908 .build()
1909 .map_err(|err| {
1910 ProtocolError::InvalidState(format!("plan reachable Git pack stream: {err}"))
1911 })?
1912 .ok_or_else(|| {
1913 ProtocolError::InvalidState("roots did not produce a reachable Git pack".to_string())
1914 })?;
1915 let pack_file = NamedTempFile::new().map_err(|err| {
1916 ProtocolError::InvalidState(format!("create Git pack tempfile: {err}"))
1917 })?;
1918 let prepared = plan
1919 .prepare_to_file(pack_file.path())
1920 .map_err(|err| {
1921 ProtocolError::InvalidState(format!("write reachable Git pack tempfile: {err}"))
1922 })?;
1923 let pack_size = prepared.summary.pack_size;
1924 let checksum = prepared.summary.checksum;
1925 if pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
1926 return Err(ProtocolError::InvalidState(format!(
1927 "Git pack exceeds maximum transfer size of {} bytes; multi-pack split for repos over this size is a follow-up (plan §B.2)",
1928 wire::MAX_RECEIVED_GIT_PACK_SIZE
1929 )));
1930 }
1931 let chunk_size = chunk_size.max(1) as u64;
1932 let chunk_count = pack_size.div_ceil(chunk_size);
1933 if chunk_count > u32::MAX as u64 {
1934 return Err(ProtocolError::InvalidState(
1935 "Git pack chunk count exceeds u32".to_string(),
1936 ));
1937 }
1938 let transfer_id = format!("git-pack:{}", checksum.to_hex());
1939 Ok(GitPackPushPlan {
1940 transfer_id,
1941 pack_id: checksum.as_bytes().to_vec(),
1942 pack_size,
1943 roots,
1944 pack_file: Arc::new(Mutex::new(pack_file)),
1945 })
1946}
1947
1948async fn send_git_pack_streaming_messages(
1949 tx: &mpsc::Sender<PushMessage>,
1950 pack: &GitPackPushPlan,
1951 chunk_size: usize,
1952 progress: &Progress,
1953) -> Result<(), ProtocolError> {
1954 let tx = tx.clone();
1955 let pack = pack.clone();
1956 let progress = progress.clone();
1957 tokio::task::spawn_blocking(move || {
1958 stream_git_pack_messages_blocking(tx, pack, chunk_size, progress)
1959 })
1960 .await
1961 .map_err(|err| ProtocolError::InvalidState(format!("Git pack streaming task failed: {err}")))?
1962}
1963
1964fn stream_git_pack_messages_blocking(
1965 tx: mpsc::Sender<PushMessage>,
1966 pack: GitPackPushPlan,
1967 chunk_size: usize,
1968 progress: Progress,
1969) -> Result<(), ProtocolError> {
1970 let mut writer = GitPackPushMessageWriter::new(
1971 tx,
1972 pack.transfer_id.clone(),
1973 pack.pack_id.clone(),
1974 pack.pack_size,
1975 chunk_size,
1976 progress,
1977 );
1978 let mut pack_file = pack.pack_file.lock().map_err(|err| {
1979 ProtocolError::InvalidState(format!("lock Git pack tempfile: {err}"))
1980 })?;
1981 pack_file
1982 .seek(SeekFrom::Start(0))
1983 .map_err(|err| ProtocolError::InvalidState(format!("rewind Git pack tempfile: {err}")))?;
1984 let streamed = io::copy(&mut pack_file.as_file_mut(), &mut writer).map_err(|err| {
1985 ProtocolError::InvalidState(format!("stream Git pack tempfile: {err}"))
1986 })?;
1987 if streamed != pack.pack_size {
1988 return Err(ProtocolError::InvalidState(format!(
1989 "Git pack stream changed while sending; expected {} bytes/{}, streamed {} bytes",
1990 pack.pack_size,
1991 hex::encode(&pack.pack_id),
1992 streamed
1993 )));
1994 }
1995 writer.finish()?;
1996 Ok(())
1997}
1998
1999struct GitPackPushMessageWriter {
2000 tx: mpsc::Sender<PushMessage>,
2001 transfer_id: String,
2002 pack_id: Vec<u8>,
2003 pack_size: u64,
2004 chunk_size: usize,
2005 buffer: Vec<u8>,
2006 offset: u64,
2007 chunk_index: u32,
2008 progress: Progress,
2011 last_progress_pct: u64,
2015}
2016
2017impl GitPackPushMessageWriter {
2018 fn new(
2019 tx: mpsc::Sender<PushMessage>,
2020 transfer_id: String,
2021 pack_id: Vec<u8>,
2022 pack_size: u64,
2023 chunk_size: usize,
2024 progress: Progress,
2025 ) -> Self {
2026 let chunk_size = chunk_size.max(1);
2027 Self {
2028 tx,
2029 transfer_id,
2030 pack_id,
2031 pack_size,
2032 chunk_size,
2033 buffer: Vec::with_capacity(chunk_size),
2034 offset: 0,
2035 chunk_index: 0,
2036 progress,
2037 last_progress_pct: u64::MAX,
2038 }
2039 }
2040
2041 fn send_buffer(&mut self) -> io::Result<()> {
2042 if self.buffer.is_empty() {
2043 return Ok(());
2044 }
2045 let chunk = std::mem::take(&mut self.buffer);
2046 let next_offset = self.offset.checked_add(chunk.len() as u64).ok_or_else(|| {
2047 io::Error::new(io::ErrorKind::InvalidData, "Git pack offset overflow")
2048 })?;
2049 if next_offset > self.pack_size {
2050 return Err(io::Error::new(
2051 io::ErrorKind::InvalidData,
2052 format!(
2053 "Git pack stream exceeded planned size {}; got at least {}",
2054 self.pack_size, next_offset
2055 ),
2056 ));
2057 }
2058 let is_final_chunk = next_offset == self.pack_size;
2059 self.tx
2060 .blocking_send(git_lane_push_message(git_lane_transfer::Body::Pack(
2061 GitPackTransfer {
2062 transfer_id: self.transfer_id.clone(),
2063 offset: self.offset,
2064 chunk_index: self.chunk_index,
2065 is_final_chunk,
2066 pack_size: self.pack_size,
2067 pack_chunk: chunk.into(),
2068 pack_id: self.pack_id.clone().into(),
2069 },
2070 )))
2071 .map_err(|_| {
2072 io::Error::new(io::ErrorKind::BrokenPipe, "push stream closed unexpectedly")
2073 })?;
2074 self.offset = next_offset;
2075 self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
2076 io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk index overflow")
2077 })?;
2078 self.report_upload_progress();
2079 Ok(())
2080 }
2081
2082 fn report_upload_progress(&mut self) {
2087 if !self.progress.is_active() {
2088 return;
2089 }
2090 let pct = self.offset.saturating_mul(100) / self.pack_size.max(1);
2091 if pct == self.last_progress_pct {
2092 return;
2093 }
2094 self.last_progress_pct = pct;
2095 self.progress.set_phase(format!(
2096 "uploading {}/{} bytes ({pct}%)",
2097 self.offset, self.pack_size
2098 ));
2099 }
2100
2101 fn finish(mut self) -> Result<(), ProtocolError> {
2102 self.send_buffer().map_err(|err| {
2103 ProtocolError::InvalidState(format!("send final Git pack chunk: {err}"))
2104 })?;
2105 if self.offset != self.pack_size {
2106 return Err(ProtocolError::InvalidState(format!(
2107 "Git pack stream length mismatch: expected {}, got {}",
2108 self.pack_size, self.offset
2109 )));
2110 }
2111 Ok(())
2112 }
2113}
2114
2115impl Write for GitPackPushMessageWriter {
2116 fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
2117 let written = buf.len();
2118 while !buf.is_empty() {
2119 if self.offset == self.pack_size {
2120 return Err(io::Error::new(
2121 io::ErrorKind::InvalidData,
2122 format!("Git pack stream wrote past planned size {}", self.pack_size),
2123 ));
2124 }
2125 let capacity = self
2126 .chunk_size
2127 .checked_sub(self.buffer.len())
2128 .ok_or_else(|| {
2129 io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk buffer overflow")
2130 })?;
2131 let take = capacity.min(buf.len());
2132 self.buffer.extend_from_slice(&buf[..take]);
2133 buf = &buf[take..];
2134 if self.buffer.len() == self.chunk_size {
2135 self.send_buffer()?;
2136 }
2137 }
2138 Ok(written)
2139 }
2140
2141 fn flush(&mut self) -> io::Result<()> {
2142 Ok(())
2143 }
2144}
2145
2146fn git_ref_update_message(
2153 name: &str,
2154 kind: GrpcGitRefKind,
2155 target_oid: GitObjectId,
2156 peeled_oid: Option<GitObjectId>,
2157 checkpoint: Option<GitCheckpointTransfer>,
2158) -> PushMessage {
2159 git_lane_push_message(git_lane_transfer::Body::RefUpdate(GitRefUpdateTransfer {
2160 name: name.to_string(),
2161 kind: kind as i32,
2162 target_oid: target_oid.as_bytes().to_vec().into(),
2163 peeled_oid: peeled_oid
2164 .map(|oid| oid.as_bytes().to_vec())
2165 .unwrap_or_default()
2166 .into(),
2167 expected_missing: false,
2168 expected_target_oid: Vec::new().into(),
2169 checkpoint,
2170 }))
2171}
2172
2173fn apply_git_ref_expectation_value(
2174 message: &mut PushMessage,
2175 expectation: &GitRefRemoteExpectation,
2176) -> Result<(), ProtocolError> {
2177 let Some(push_message::Body::GitLane(GitLaneTransfer {
2178 body: Some(git_lane_transfer::Body::RefUpdate(update)),
2179 })) = message.body.as_mut()
2180 else {
2181 return Err(ProtocolError::InvalidState(
2182 "Git lane push plan missing ref update message".to_string(),
2183 ));
2184 };
2185 match expectation {
2186 GitRefRemoteExpectation::Missing => {
2187 update.expected_missing = true;
2188 update.expected_target_oid = Vec::new().into();
2189 }
2190 GitRefRemoteExpectation::Value(oid) => {
2191 update.expected_missing = false;
2192 update.expected_target_oid = oid.clone().into();
2193 }
2194 }
2195 Ok(())
2196}
2197
2198#[derive(Clone, Debug, PartialEq, Eq)]
2199enum GitRefRemoteExpectation {
2200 Missing,
2201 Value(Vec<u8>),
2202}
2203
2204fn parse_git_ref_expectation(
2205 remote_revision_address: &str,
2206) -> Result<GitRefRemoteExpectation, ProtocolError> {
2207 if remote_revision_address.is_empty() {
2208 return Ok(GitRefRemoteExpectation::Missing);
2209 }
2210
2211 match remote_revision_address.parse::<RevisionAddress>() {
2212 Ok(RevisionAddress::Heddle(_)) => Ok(GitRefRemoteExpectation::Missing),
2213 Ok(RevisionAddress::GitCommit(oid)) => hex::decode(&oid)
2214 .map(GitRefRemoteExpectation::Value)
2215 .map_err(|err| {
2216 ProtocolError::InvalidState(format!(
2217 "server returned invalid Git remote_revision_address: {err}"
2218 ))
2219 }),
2220 Err(err) => Err(ProtocolError::InvalidState(format!(
2221 "server returned invalid remote_revision_address {remote_revision_address:?}: {err}"
2222 ))),
2223 }
2224}
2225
2226fn git_lane_push_message(body: git_lane_transfer::Body) -> PushMessage {
2227 PushMessage {
2228 body: Some(push_message::Body::GitLane(GitLaneTransfer {
2229 body: Some(body),
2230 })),
2231 }
2232}
2233
2234fn git_lane_transfer_size(transfer: &GitLaneTransfer) -> usize {
2235 match transfer.body.as_ref() {
2236 Some(git_lane_transfer::Body::Pack(pack)) => pack.pack_chunk.len(),
2237 Some(git_lane_transfer::Body::RefUpdate(update)) => update
2238 .target_oid
2239 .len()
2240 .saturating_add(update.peeled_oid.len())
2241 .saturating_add(update.expected_target_oid.len())
2242 .saturating_add(
2243 update
2244 .checkpoint
2245 .as_ref()
2246 .map(git_checkpoint_transfer_size)
2247 .unwrap_or_default(),
2248 ),
2249 Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2250 git_checkpoint_transfer_size(checkpoint)
2251 }
2252 None => 0,
2253 }
2254}
2255
2256fn git_checkpoint_transfer_size(checkpoint: &GitCheckpointTransfer) -> usize {
2257 checkpoint
2258 .heddle_change_id
2259 .len()
2260 .saturating_add(checkpoint.git_commit_oid.len())
2261 .saturating_add(checkpoint.thread.len())
2262 .saturating_add(checkpoint.metadata_json.len())
2263}
2264
2265#[derive(Default)]
2266struct GitPackPullInstallState {
2267 active: Option<GitPackPullInstall>,
2268}
2269
2270impl GitPackPullInstallState {
2271 fn ensure_idle(&self) -> Result<(), ProtocolError> {
2272 if self.active.is_none() {
2273 Ok(())
2274 } else {
2275 Err(ProtocolError::InvalidState(
2276 "Git pack transfer ended before final chunk".to_string(),
2277 ))
2278 }
2279 }
2280
2281 fn receive_chunk(
2282 &mut self,
2283 git_repo: &SleyRepository,
2284 pack: GitPackTransfer,
2285 ) -> Result<(), ProtocolError> {
2286 if pack.transfer_id.is_empty() {
2287 return Err(ProtocolError::InvalidState(
2288 "Git pack transfer_id is required".to_string(),
2289 ));
2290 }
2291 if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
2292 return Err(ProtocolError::InvalidState(format!(
2293 "Git pack exceeds maximum transfer size of {} bytes",
2294 wire::MAX_RECEIVED_GIT_PACK_SIZE
2295 )));
2296 }
2297 if pack.pack_chunk.is_empty() {
2298 return Err(ProtocolError::InvalidState(
2299 "Git pack chunk must not be empty".to_string(),
2300 ));
2301 }
2302 let pack_id =
2303 GitObjectId::from_raw(git_repo.object_format(), &pack.pack_id).map_err(|err| {
2304 ProtocolError::InvalidState(format!("GitPackTransfer.pack_id: {err}"))
2305 })?;
2306 if self.active.is_none() {
2307 if pack.offset != 0 {
2308 return Err(ProtocolError::InvalidState(format!(
2309 "Git pack offset mismatch: expected 0, got {}",
2310 pack.offset
2311 )));
2312 }
2313 if pack.chunk_index != 0 {
2314 return Err(ProtocolError::InvalidState(format!(
2315 "Git pack chunk index mismatch: expected 0, got {}",
2316 pack.chunk_index
2317 )));
2318 }
2319 let writer = git_repo
2320 .objects()
2321 .begin_raw_pack_install(pack_id, pack.pack_size)
2322 .map_err(|err| {
2323 ProtocolError::InvalidState(format!("begin Git pack install: {err}"))
2324 })?;
2325 self.active = Some(GitPackPullInstall {
2326 transfer_id: pack.transfer_id.clone(),
2327 pack_id,
2328 pack_size: pack.pack_size,
2329 next_offset: 0,
2330 next_chunk_index: 0,
2331 writer,
2332 });
2333 }
2334
2335 let active = self.active.as_mut().ok_or_else(|| {
2336 ProtocolError::InvalidState("Git pack install not active".to_string())
2337 })?;
2338 active.receive_chunk(&pack, pack_id)?;
2339 if pack.is_final_chunk {
2340 let active = self.active.take().ok_or_else(|| {
2341 ProtocolError::InvalidState("Git pack install not active".to_string())
2342 })?;
2343 active.finish()?;
2344 git_repo.refresh_objects();
2345 }
2346 Ok(())
2347 }
2348}
2349
2350struct GitPackPullInstall {
2351 transfer_id: String,
2352 pack_id: GitObjectId,
2353 pack_size: u64,
2354 next_offset: u64,
2355 next_chunk_index: u32,
2356 writer: sley::plumbing::sley_odb::RawPackStreamingInstall,
2357}
2358
2359impl GitPackPullInstall {
2360 fn receive_chunk(
2361 &mut self,
2362 pack: &GitPackTransfer,
2363 pack_id: GitObjectId,
2364 ) -> Result<(), ProtocolError> {
2365 if self.transfer_id != pack.transfer_id {
2366 return Err(ProtocolError::InvalidState(format!(
2367 "Git pack transfer id changed from {:?} to {:?}",
2368 self.transfer_id, pack.transfer_id
2369 )));
2370 }
2371 if self.pack_id != pack_id {
2372 return Err(ProtocolError::InvalidState(
2373 "Git pack id changed during transfer".to_string(),
2374 ));
2375 }
2376 if self.pack_size != pack.pack_size {
2377 return Err(ProtocolError::InvalidState(
2378 "Git pack size changed during transfer".to_string(),
2379 ));
2380 }
2381 if pack.offset != self.next_offset {
2382 return Err(ProtocolError::InvalidState(format!(
2383 "Git pack offset mismatch: expected {}, got {}",
2384 self.next_offset, pack.offset
2385 )));
2386 }
2387 if pack.chunk_index != self.next_chunk_index {
2388 return Err(ProtocolError::InvalidState(format!(
2389 "Git pack chunk index mismatch: expected {}, got {}",
2390 self.next_chunk_index, pack.chunk_index
2391 )));
2392 }
2393 let chunk_len = u64::try_from(pack.pack_chunk.len()).map_err(|_| {
2394 ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
2395 })?;
2396 let next_offset = self
2397 .next_offset
2398 .checked_add(chunk_len)
2399 .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
2400 if next_offset > self.pack_size {
2401 return Err(ProtocolError::InvalidState(
2402 "Git pack chunk exceeds declared pack size".to_string(),
2403 ));
2404 }
2405 self.writer.write_all(&pack.pack_chunk).map_err(|err| {
2406 ProtocolError::InvalidState(format!("write streamed Git pack chunk: {err}"))
2407 })?;
2408 self.next_offset = next_offset;
2409 self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
2410 ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
2411 })?;
2412 if pack.is_final_chunk {
2413 if self.next_offset != self.pack_size {
2414 return Err(ProtocolError::InvalidState(format!(
2415 "Git pack final size mismatch: declared {}, received {}",
2416 self.pack_size, self.next_offset
2417 )));
2418 }
2419 } else if self.next_offset == self.pack_size {
2420 return Err(ProtocolError::InvalidState(
2421 "Git pack reached declared size without final chunk marker".to_string(),
2422 ));
2423 }
2424 Ok(())
2425 }
2426
2427 fn finish(self) -> Result<(), ProtocolError> {
2428 self.writer
2429 .finish()
2430 .map_err(|err| ProtocolError::InvalidState(format!("install Git pack: {err}")))?;
2431 Ok(())
2432 }
2433}
2434
2435fn accept_git_lane_pull_transfer(
2436 repo: &Repository,
2437 git_repo: &mut Option<SleyRepository>,
2438 git_pack_state: &mut GitPackPullInstallState,
2439 transfer: GitLaneTransfer,
2440) -> Result<(), ProtocolError> {
2441 if repo.capability() != RepositoryCapability::GitOverlay {
2442 return Err(ProtocolError::InvalidState(format!(
2443 "received git-lane pull transfer for non-GitOverlay repository (capability {:?})",
2444 repo.capability()
2445 )));
2446 }
2447 match transfer.body {
2448 Some(git_lane_transfer::Body::Pack(pack)) => {
2449 accept_git_lane_pack(repo, git_repo, git_pack_state, pack)
2450 }
2451 Some(git_lane_transfer::Body::RefUpdate(update)) => {
2452 accept_git_lane_ref_update(repo, git_repo, update)
2453 }
2454 Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2455 record_git_lane_checkpoint(repo, git_lane_sley_repository(repo, git_repo)?, checkpoint)
2456 }
2457 None => Err(ProtocolError::InvalidState(
2458 "GitLaneTransfer body is required".to_string(),
2459 )),
2460 }
2461}
2462
2463fn accept_git_lane_pack(
2464 repo: &Repository,
2465 git_repo: &mut Option<SleyRepository>,
2466 git_pack_state: &mut GitPackPullInstallState,
2467 pack: GitPackTransfer,
2468) -> Result<(), ProtocolError> {
2469 let git_repo = git_lane_sley_repository(repo, git_repo)?;
2470 git_pack_state.receive_chunk(git_repo, pack)?;
2471 Ok(())
2472}
2473
2474fn accept_git_lane_ref_update(
2484 repo: &Repository,
2485 git_repo: &mut Option<SleyRepository>,
2486 update: GitRefUpdateTransfer,
2487) -> Result<(), ProtocolError> {
2488 let git_repo = git_lane_sley_repository(repo, git_repo)?;
2489 let target = git_oid_from_bytes(
2490 git_repo,
2491 "GitRefUpdateTransfer.target_oid",
2492 &update.target_oid,
2493 )?;
2494 git_repo.read_commit(&target).map_err(|err| {
2495 ProtocolError::InvalidState(format!(
2496 "Git ref {} target commit {} is not present after pack receive: {err}",
2497 update.name,
2498 target.to_hex()
2499 ))
2500 })?;
2501 let refs = git_repo.references();
2502 let mut tx = refs.transaction();
2503 tx.update_to(
2504 update.name.clone(),
2505 ReferenceTarget::Direct(target),
2506 RefPrecondition::Any,
2507 None,
2508 );
2509 tx.commit().map_err(|err| {
2510 ProtocolError::InvalidState(format!("update Git ref {}: {err}", update.name))
2511 })?;
2512 if let Some(checkpoint) = update.checkpoint {
2513 record_git_lane_checkpoint(repo, git_repo, checkpoint)?;
2514 }
2515 Ok(())
2516}
2517
2518fn record_git_lane_checkpoint(
2519 repo: &Repository,
2520 git_repo: &SleyRepository,
2521 checkpoint: GitCheckpointTransfer,
2522) -> Result<(), ProtocolError> {
2523 let state = ChangeId::try_from_slice(&checkpoint.heddle_change_id)
2524 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2525 let commit_oid = git_oid_from_bytes(
2526 git_repo,
2527 "GitCheckpointTransfer.git_commit_oid",
2528 &checkpoint.git_commit_oid,
2529 )?;
2530 let commit_hex = commit_oid.to_hex();
2531 if repo
2532 .latest_git_checkpoint_for_change(&state)
2533 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2534 .is_some_and(|record| record.git_commit == commit_hex)
2535 {
2536 return Ok(());
2537 }
2538 let summary = serde_json::from_str::<serde_json::Value>(&checkpoint.metadata_json)
2539 .ok()
2540 .and_then(|metadata| {
2541 metadata
2542 .get("summary")
2543 .and_then(serde_json::Value::as_str)
2544 .map(str::to_string)
2545 })
2546 .unwrap_or_else(|| "pulled Git checkpoint".to_string());
2547 repo.record_git_checkpoint(&state, commit_hex, summary)
2548 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2549 Ok(())
2550}
2551
2552fn git_lane_sley_repository<'a>(
2553 repo: &Repository,
2554 git_repo: &'a mut Option<SleyRepository>,
2555) -> Result<&'a SleyRepository, ProtocolError> {
2556 if git_repo.is_none() {
2557 *git_repo = Some(
2558 repo.git_overlay_sley_repository()
2559 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2560 .ok_or_else(|| {
2561 ProtocolError::InvalidState(
2562 "git-overlay repository has no Git store".to_string(),
2563 )
2564 })?,
2565 );
2566 }
2567 match git_repo.as_ref() {
2568 Some(git_repo) => Ok(git_repo),
2569 None => Err(ProtocolError::InvalidState(
2570 "git-overlay repository has no Git store".to_string(),
2571 )),
2572 }
2573}
2574
2575fn git_oid_from_bytes(
2576 git_repo: &SleyRepository,
2577 field: &str,
2578 bytes: &[u8],
2579) -> Result<GitObjectId, ProtocolError> {
2580 GitObjectId::from_raw(git_repo.object_format(), bytes)
2581 .map_err(|err| ProtocolError::InvalidState(format!("{field}: {err}")))
2582}
2583
2584async fn send_native_pack_streaming_messages(
2585 tx: &mpsc::Sender<PushMessage>,
2586 repo: &Repository,
2587 objects: &[ObjectInfo],
2588 transfer_id: &str,
2589 chunk_size: usize,
2590 transport: &super::helpers::HostedTransportPolicy,
2591 transport_mode: TransportMode,
2592) -> Result<(), ProtocolError> {
2593 let object_count = u64::try_from(objects.len()).map_err(|_| {
2594 ProtocolError::InvalidState("native pack object count exceeds u64".to_string())
2595 })?;
2596 let mut writer = wire::NativePackStreamingWriter::new_in(repo.heddle_dir(), object_count)?;
2597 let mut pack_reader = wire::GrowingPackChunkReader::open(writer.pack_path(), chunk_size)?;
2598 let (loaded_tx, mut loaded_rx) = mpsc::channel::<(
2599 usize,
2600 Result<wire::ObjectData, ProtocolError>,
2601 )>(NATIVE_PACK_OBJECT_PREFETCH_LIMIT);
2602 let store = repo.store().clone();
2603 let object_plan = objects.to_vec();
2604 let loader = tokio::task::spawn_blocking(move || {
2605 load_native_pack_objects_parallel(store, object_plan, loaded_tx);
2606 });
2607
2608 let mut next_index = 0usize;
2609 let mut pending = BTreeMap::new();
2610 while next_index < objects.len() {
2611 let (index, object) = loaded_rx.recv().await.ok_or_else(|| {
2612 ProtocolError::InvalidState(
2613 "native pack object loader stopped before sending all objects".to_string(),
2614 )
2615 })?;
2616 pending.insert(index, object);
2617
2618 while let Some(object) = pending.remove(&next_index) {
2619 let object = object?;
2620 let should_drain = object.data.len() >= chunk_size
2621 || (next_index + 1).is_multiple_of(NATIVE_PACK_DRAIN_OBJECT_INTERVAL);
2622 writer.add_object_data(object)?;
2623 if should_drain {
2624 writer.flush_pack()?;
2625 drain_growing_native_pack_stream(
2626 tx,
2627 &mut pack_reader,
2628 false,
2629 PackStreamKind::Pack,
2630 transfer_id,
2631 transport,
2632 transport_mode,
2633 )
2634 .await?;
2635 }
2636 next_index += 1;
2637 }
2638 }
2639 loader.await.map_err(|err| {
2640 ProtocolError::InvalidState(format!("native pack object loader task failed: {err}"))
2641 })?;
2642
2643 let bundle = writer.finish()?;
2644 drain_growing_native_pack_stream(
2645 tx,
2646 &mut pack_reader,
2647 true,
2648 PackStreamKind::Pack,
2649 transfer_id,
2650 transport,
2651 transport_mode,
2652 )
2653 .await?;
2654 send_native_pack_file_stream(
2655 tx,
2656 &bundle.index_path,
2657 PackStreamKind::Index,
2658 transfer_id,
2659 chunk_size,
2660 transport,
2661 transport_mode,
2662 )
2663 .await
2664}
2665
2666fn load_native_pack_objects_parallel(
2667 store: AnyStore,
2668 objects: Vec<ObjectInfo>,
2669 tx: mpsc::Sender<(usize, Result<wire::ObjectData, ProtocolError>)>,
2670) {
2671 if objects.is_empty() {
2672 return;
2673 }
2674 let worker_count = native_pack_object_load_worker_count(objects.len());
2675 let objects = Arc::new(objects);
2676 let next_index = Arc::new(AtomicUsize::new(0));
2677
2678 std::thread::scope(|scope| {
2679 for _ in 0..worker_count {
2680 let store = store.clone();
2681 let objects = Arc::clone(&objects);
2682 let next_index = Arc::clone(&next_index);
2683 let tx = tx.clone();
2684 scope.spawn(move || {
2685 loop {
2686 let index = next_index.fetch_add(1, Ordering::Relaxed);
2687 let Some(info) = objects.get(index) else {
2688 break;
2689 };
2690 let object = wire::load_object_data(&store, &info.id, info.obj_type);
2691 if tx.blocking_send((index, object)).is_err() {
2692 break;
2693 }
2694 }
2695 });
2696 }
2697 });
2698}
2699
2700fn native_pack_object_load_worker_count(object_count: usize) -> usize {
2701 let available = std::thread::available_parallelism()
2702 .map(usize::from)
2703 .unwrap_or(1);
2704 object_count
2705 .min(available)
2706 .clamp(1, NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT)
2707}
2708
2709async fn drain_growing_native_pack_stream(
2710 tx: &mpsc::Sender<PushMessage>,
2711 reader: &mut wire::GrowingPackChunkReader,
2712 final_stream: bool,
2713 stream_kind: PackStreamKind,
2714 transfer_id: &str,
2715 transport: &super::helpers::HostedTransportPolicy,
2716 transport_mode: TransportMode,
2717) -> Result<(), ProtocolError> {
2718 while let Some((offset, chunk_index, data, is_final_chunk)) =
2719 reader.next_available_chunk(final_stream)?
2720 {
2721 send_pack_chunk(
2722 tx,
2723 stream_kind,
2724 data,
2725 transfer_id,
2726 transport,
2727 transport_mode,
2728 chunk_index,
2729 offset,
2730 is_final_chunk,
2731 )
2732 .await?;
2733 }
2734 Ok(())
2735}
2736
2737async fn send_native_pack_file_stream(
2738 tx: &mpsc::Sender<PushMessage>,
2739 path: &std::path::Path,
2740 stream_kind: PackStreamKind,
2741 transfer_id: &str,
2742 chunk_size: usize,
2743 transport: &super::helpers::HostedTransportPolicy,
2744 transport_mode: TransportMode,
2745) -> Result<(), ProtocolError> {
2746 let mut reader = wire::PackFileChunkReader::open(path, chunk_size)?;
2747 while let Some((offset, chunk_index, data, is_final_chunk)) = reader.next_chunk()? {
2748 send_pack_chunk(
2749 tx,
2750 stream_kind,
2751 data,
2752 transfer_id,
2753 transport,
2754 transport_mode,
2755 chunk_index,
2756 offset,
2757 is_final_chunk,
2758 )
2759 .await?;
2760 }
2761 Ok(())
2762}
2763
2764#[allow(clippy::too_many_arguments)]
2765async fn send_pack_chunk(
2766 tx: &mpsc::Sender<PushMessage>,
2767 stream_kind: PackStreamKind,
2768 data: Vec<u8>,
2769 transfer_id: &str,
2770 transport: &super::helpers::HostedTransportPolicy,
2771 transport_mode: TransportMode,
2772 chunk_index: u32,
2773 offset: u64,
2774 is_final_chunk: bool,
2775) -> Result<(), ProtocolError> {
2776 let chunk_length = data.len().min(u32::MAX as usize) as u32;
2777 tx.send(PushMessage {
2778 body: Some(push_message::Body::Pack(PackChunk {
2779 stream_kind: stream_kind as i32,
2780 data: data.into(),
2781 transfer: Some(transport.transfer_checkpoint_with_mode(
2782 transfer_id,
2783 transport_mode,
2784 chunk_index,
2785 offset,
2786 is_final_chunk,
2787 )),
2788 chunk_length,
2789 is_final_chunk,
2790 })),
2791 })
2792 .await
2793 .map_err(|_| ProtocolError::InvalidState("push stream closed unexpectedly".to_string()))
2794}
2795
2796fn preferred_transport_mode(
2797 transport: &super::helpers::HostedTransportPolicy,
2798 object_count: usize,
2799) -> TransportMode {
2800 let _ = transport;
2801 let _ = object_count;
2802 TransportMode::NativePack
2803}
2804
2805#[cfg(test)]
2806mod tests {
2807 use std::collections::HashSet;
2808
2809 use chrono::{TimeZone, Utc};
2810 use cli_shared::ClientConfig;
2811 use grpc::heddle::v1::{
2812 ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
2813 TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
2814 repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
2815 };
2816 use objects::{
2817 object::{
2818 Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
2819 StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
2820 },
2821 store::ObjectStore,
2822 };
2823 use tempfile::TempDir;
2824 use tonic::{Response, Status, transport::Server};
2825 use wire::{ObjectId, ObjectInfo};
2826
2827 use super::*;
2828 use crate::grpc_hosted::helpers::{descriptor_id_from_info, to_proto_object_info};
2829
2830 fn temp_repo() -> (TempDir, Repository) {
2831 let dir = TempDir::new().expect("tempdir");
2832 let repo = Repository::init_default(dir.path()).expect("init repo");
2833 (dir, repo)
2834 }
2835
2836 fn temp_repo_unseeded() -> (TempDir, Repository) {
2840 let dir = TempDir::new().expect("tempdir");
2841 let repo = Repository::init(dir.path()).expect("init repo");
2842 (dir, repo)
2843 }
2844
2845 fn redaction_info(blob: ContentHash) -> ObjectInfo {
2846 ObjectInfo {
2847 id: ObjectId::Hash(blob),
2848 obj_type: ObjectType::Redaction,
2849 size: 0,
2850 delta_base: None,
2851 }
2852 }
2853
2854 fn state_info(state: ChangeId) -> ObjectInfo {
2855 ObjectInfo {
2856 id: ObjectId::ChangeId(state),
2857 obj_type: ObjectType::State,
2858 size: 0,
2859 delta_base: None,
2860 }
2861 }
2862
2863 fn state_visibility_info(state: ChangeId) -> ObjectInfo {
2864 ObjectInfo {
2865 id: ObjectId::ChangeId(state),
2866 obj_type: ObjectType::StateVisibility,
2867 size: 0,
2868 delta_base: None,
2869 }
2870 }
2871
2872 fn sample_blob() -> ContentHash {
2873 ContentHash::from_bytes([7u8; 32])
2874 }
2875
2876 #[test]
2877 fn descriptor_id_from_info_matches_proto_encode_path() {
2878 let infos = [
2879 redaction_info(sample_blob()),
2880 state_info(ChangeId::from_bytes([3u8; 16])),
2881 state_visibility_info(ChangeId::from_bytes([9u8; 16])),
2882 ];
2883 for info in infos {
2884 assert_eq!(
2885 descriptor_id_from_info(&info),
2886 descriptor_id(&to_proto_object_info(&info)),
2887 "keying path must stay byte-identical to the throwaway-encode path",
2888 );
2889 }
2890 }
2891
2892 #[test]
2893 fn git_lane_messages_pack_reachable_commit_graph_and_attach_checkpoint() {
2894 let dir = TempDir::new().expect("tempdir");
2895 let git = sley::Repository::init(dir.path()).expect("init git");
2896 let blob_oid = git.write_blob(b"hello\n").expect("write blob");
2897 let tree = sley::TreeObject {
2898 entries: vec![sley::plumbing::sley_object::TreeEntry {
2899 mode: 0o100644,
2900 name: sley::BString::from(b"hello.txt"),
2901 oid: blob_oid,
2902 }],
2903 };
2904 let tree_oid = git
2905 .write_raw_object(sley::GitObjectType::Tree, tree.write())
2906 .expect("write tree");
2907 let commit = sley::CommitObject {
2908 tree: tree_oid,
2909 parents: Vec::new(),
2910 author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2911 committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2912 encoding: None,
2913 message: b"checkpoint\n".to_vec(),
2914 };
2915 let commit_oid = git
2916 .write_raw_object(sley::GitObjectType::Commit, commit.write())
2917 .expect("write commit");
2918
2919 let pack = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64 * 1024)
2920 .expect("build git lane pack plan");
2921 assert_eq!(pack.pack_id.len(), git.object_format().raw_len());
2922 let (tx, mut rx) = mpsc::channel(8);
2923 stream_git_pack_messages_blocking(tx, pack.clone(), 64 * 1024, Progress::null())
2924 .expect("stream git lane pack");
2925 let mut pack_bytes = Vec::new();
2926 let mut chunks = Vec::new();
2927 while let Some(chunk) = rx.blocking_recv() {
2928 let Some(push_message::Body::GitLane(GitLaneTransfer {
2929 body: Some(git_lane_transfer::Body::Pack(pack)),
2930 })) = chunk.body
2931 else {
2932 panic!("expected git pack chunk");
2933 };
2934 pack_bytes.extend_from_slice(&pack.pack_chunk);
2935 chunks.push(pack);
2936 }
2937 assert!(!chunks.is_empty());
2938 assert!(chunks.last().expect("pack chunk").is_final_chunk);
2939 assert_eq!(pack_bytes.len() as u64, pack.pack_size);
2940 let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
2941 .expect("generated pack should index");
2942 assert_eq!(indexed.pack_id.as_bytes(), pack.pack_id.as_slice());
2943 assert_eq!(indexed.objects.len(), 3);
2944
2945 let state = ChangeId::from_bytes([9u8; 16]);
2946 let ref_message = git_ref_update_message(
2947 "refs/heads/main",
2948 GrpcGitRefKind::Branch,
2949 commit_oid,
2950 None,
2951 Some(GitCheckpointTransfer {
2952 heddle_change_id: state.as_bytes().to_vec().into(),
2953 git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2954 thread: "main".to_string(),
2955 metadata_json: String::new(),
2956 }),
2957 );
2958 let Some(push_message::Body::GitLane(GitLaneTransfer {
2959 body: Some(git_lane_transfer::Body::RefUpdate(update)),
2960 })) = ref_message.body
2961 else {
2962 panic!("expected git ref update message");
2963 };
2964 assert_eq!(update.name, "refs/heads/main");
2965 assert_eq!(update.kind, GrpcGitRefKind::Branch as i32);
2966 assert_eq!(update.target_oid.as_ref(), commit_oid.as_bytes());
2967 let checkpoint = update.checkpoint.expect("checkpoint");
2968 assert_eq!(checkpoint.heddle_change_id.as_ref(), state.as_bytes());
2969 assert_eq!(checkpoint.git_commit_oid.as_ref(), commit_oid.as_bytes());
2970 assert_eq!(checkpoint.thread, "main");
2971 }
2972
2973 fn sample_ref_update_message(commit_oid: GitObjectId) -> PushMessage {
2974 git_ref_update_message(
2975 "refs/heads/main",
2976 GrpcGitRefKind::Branch,
2977 commit_oid,
2978 None,
2979 Some(GitCheckpointTransfer {
2980 heddle_change_id: ChangeId::from_bytes([9u8; 16]).as_bytes().to_vec().into(),
2981 git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2982 thread: "main".to_string(),
2983 metadata_json: String::new(),
2984 }),
2985 )
2986 }
2987
2988 fn write_commit(git: &sley::Repository, seed: &str) -> GitObjectId {
2991 let blob_oid = git
2992 .write_blob(format!("content-{seed}\n").as_bytes())
2993 .expect("write blob");
2994 let tree = sley::TreeObject {
2995 entries: vec![sley::plumbing::sley_object::TreeEntry {
2996 mode: 0o100644,
2997 name: sley::BString::from(format!("{seed}.txt").into_bytes()),
2998 oid: blob_oid,
2999 }],
3000 };
3001 let tree_oid = git
3002 .write_raw_object(sley::GitObjectType::Tree, tree.write())
3003 .expect("write tree");
3004 let commit = sley::CommitObject {
3005 tree: tree_oid,
3006 parents: Vec::new(),
3007 author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
3008 committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
3009 encoding: None,
3010 message: format!("commit {seed}\n").into_bytes(),
3011 };
3012 git.write_raw_object(sley::GitObjectType::Commit, commit.write())
3013 .expect("write commit")
3014 }
3015
3016 fn set_ref(git: &mut sley::Repository, name: &str, target: GitObjectId) {
3017 let refs = git.references();
3018 let mut tx = refs.transaction();
3019 tx.update_to(
3020 name.to_string(),
3021 ReferenceTarget::Direct(target),
3022 RefPrecondition::Any,
3023 None,
3024 );
3025 tx.commit().expect("commit ref");
3026 }
3027
3028 fn mirror_ref_updates(plan: &GitLanePushPlan) -> Vec<GitRefUpdateTransfer> {
3029 plan.ref_updates
3030 .iter()
3031 .map(git_ref_update_from_message)
3032 .cloned()
3033 .collect()
3034 }
3035
3036 #[test]
3039 fn git_lane_reachable_pack_plan_matches_legacy_writer_checksum() {
3040 let dir = TempDir::new().expect("tempdir");
3041 let git = sley::Repository::init(dir.path()).expect("init git");
3042 let commit_oid = write_commit(&git, "byte-identity");
3043
3044 let pack_plan = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64 * 1024)
3045 .expect("build pack plan");
3046
3047 let mut legacy_pack = Vec::new();
3048 let legacy = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
3049 git.objects().as_ref(),
3050 git.object_format(),
3051 std::iter::once(commit_oid),
3052 &HashSet::new(),
3053 &mut legacy_pack,
3054 )
3055 .expect("legacy reachable pack")
3056 .expect("legacy pack summary");
3057
3058 assert_eq!(pack_plan.pack_id, legacy.checksum.as_bytes().to_vec());
3059 assert_eq!(pack_plan.pack_size, legacy.pack_size);
3060 assert_eq!(pack_plan.pack_size, legacy_pack.len() as u64);
3061
3062 let mut planned_pack = Vec::new();
3063 pack_plan
3064 .pack_file
3065 .lock()
3066 .expect("lock pack tempfile")
3067 .seek(SeekFrom::Start(0))
3068 .expect("rewind pack tempfile");
3069 io::copy(
3070 &mut pack_plan
3071 .pack_file
3072 .lock()
3073 .expect("lock pack tempfile")
3074 .as_file_mut(),
3075 &mut planned_pack,
3076 )
3077 .expect("read planned pack");
3078 assert_eq!(planned_pack, legacy_pack);
3079 }
3080
3081 #[test]
3085 fn git_mirror_plan_builds_one_pack_and_checkpointless_updates_for_all_refs() {
3086 let dir = TempDir::new().expect("tempdir");
3087 let mut git = sley::Repository::init(dir.path()).expect("init git");
3088
3089 let main_commit = write_commit(&git, "main");
3090 let feature_commit = write_commit(&git, "feature");
3091 let tagged_commit = write_commit(&git, "tagged");
3092
3093 set_ref(&mut git, "refs/heads/main", main_commit);
3094 set_ref(&mut git, "refs/heads/feature", feature_commit);
3095
3096 let tag = sley::TagObject {
3098 object: tagged_commit,
3099 object_type: sley::GitObjectType::Commit,
3100 name: b"v1".to_vec(),
3101 tagger: Some(b"Tester <test@example.com> 1700000000 +0000".to_vec()),
3102 message: b"release v1\n".to_vec(),
3103 raw_body: None,
3104 };
3105 let tag_oid = git
3106 .write_raw_object(sley::GitObjectType::Tag, tag.write())
3107 .expect("write tag");
3108 set_ref(&mut git, "refs/tags/v1", tag_oid);
3109
3110 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3111 .expect("build mirror plan");
3112
3113 let updates = mirror_ref_updates(&plan);
3114 assert_eq!(updates.len(), 3, "one ref update per direct ref");
3115
3116 assert!(
3119 updates.iter().all(|u| u.checkpoint.is_none()),
3120 "mirror mode signals via checkpoint: None on every ref update",
3121 );
3122
3123 let by_name: HashMap<&str, &GitRefUpdateTransfer> =
3124 updates.iter().map(|u| (u.name.as_str(), u)).collect();
3125
3126 let main = by_name["refs/heads/main"];
3127 assert_eq!(main.kind, GrpcGitRefKind::Branch as i32);
3128 assert_eq!(main.target_oid.as_ref(), main_commit.as_bytes());
3129 assert!(main.peeled_oid.is_empty(), "commit refs are not peeled");
3130
3131 let feature = by_name["refs/heads/feature"];
3132 assert_eq!(feature.kind, GrpcGitRefKind::Branch as i32);
3133 assert_eq!(feature.target_oid.as_ref(), feature_commit.as_bytes());
3134
3135 let tag_update = by_name["refs/tags/v1"];
3136 assert_eq!(tag_update.kind, GrpcGitRefKind::Tag as i32);
3137 assert_eq!(tag_update.target_oid.as_ref(), tag_oid.as_bytes());
3138 assert_eq!(
3139 tag_update.peeled_oid.as_ref(),
3140 tagged_commit.as_bytes(),
3141 "annotated tag ref is peeled to its underlying commit",
3142 );
3143
3144 let mut pack_roots: Vec<GitObjectId> = plan.pack.roots.clone();
3146 pack_roots.sort_by_key(|oid| oid.to_hex());
3147 let mut expected_roots = vec![main_commit, feature_commit, tag_oid];
3148 expected_roots.sort_by_key(|oid| oid.to_hex());
3149 assert_eq!(
3150 pack_roots, expected_roots,
3151 "pack roots == resolved ref targets",
3152 );
3153
3154 let (tx, mut rx) = mpsc::channel(64);
3157 stream_git_pack_messages_blocking(tx, plan.pack.clone(), 64 * 1024, Progress::null())
3158 .expect("stream mirror pack");
3159 let mut pack_bytes = Vec::new();
3160 while let Some(message) = rx.blocking_recv() {
3161 if let Some(push_message::Body::GitLane(GitLaneTransfer {
3162 body: Some(git_lane_transfer::Body::Pack(pack)),
3163 })) = message.body
3164 {
3165 pack_bytes.extend_from_slice(&pack.pack_chunk);
3166 }
3167 }
3168 assert_eq!(
3169 pack_bytes.len() as u64,
3170 plan.pack.pack_size,
3171 "streamed pack size must match plan",
3172 );
3173 let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
3174 .expect("mirror pack indexes");
3175 let packed: HashSet<Vec<u8>> = indexed
3176 .objects
3177 .iter()
3178 .map(|obj| obj.oid.as_bytes().to_vec())
3179 .collect();
3180 for oid in [main_commit, feature_commit, tagged_commit, tag_oid] {
3181 assert!(
3182 packed.contains(oid.as_bytes()),
3183 "pack must contain {}",
3184 oid.to_hex()
3185 );
3186 }
3187 }
3188
3189 #[test]
3192 fn git_mirror_plan_skips_symbolic_refs() {
3193 let dir = TempDir::new().expect("tempdir");
3194 let mut git = sley::Repository::init(dir.path()).expect("init git");
3195 let main_commit = write_commit(&git, "main");
3196 set_ref(&mut git, "refs/heads/main", main_commit);
3197 {
3200 let refs = git.references();
3201 let mut tx = refs.transaction();
3202 tx.update_to(
3203 "HEAD".to_string(),
3204 ReferenceTarget::Symbolic("refs/heads/main".to_string()),
3205 RefPrecondition::Any,
3206 None,
3207 );
3208 tx.commit().expect("set HEAD");
3209 }
3210
3211 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3212 .expect("build mirror plan");
3213 let updates = mirror_ref_updates(&plan);
3214 assert_eq!(updates.len(), 1, "only the direct ref is pushed");
3215 assert_eq!(updates[0].name, "refs/heads/main");
3216 }
3217
3218 #[test]
3221 fn git_mirror_plan_applies_per_ref_remote_expectations() {
3222 let dir = TempDir::new().expect("tempdir");
3223 let mut git = sley::Repository::init(dir.path()).expect("init git");
3224 let main_commit = write_commit(&git, "main");
3225 let feature_commit = write_commit(&git, "feature");
3226 set_ref(&mut git, "refs/heads/main", main_commit);
3227 set_ref(&mut git, "refs/heads/feature", feature_commit);
3228
3229 let remote_oid = "89abcdef012345670123456789abcdef01234567";
3230 let mut expectations = HashMap::new();
3231 expectations.insert(
3232 "refs/heads/main".to_string(),
3233 GitRefRemoteExpectation::Value(hex::decode(remote_oid).expect("hex")),
3234 );
3235 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &expectations)
3238 .expect("build mirror plan");
3239 let updates = mirror_ref_updates(&plan);
3240 let by_name: HashMap<&str, &GitRefUpdateTransfer> =
3241 updates.iter().map(|u| (u.name.as_str(), u)).collect();
3242
3243 let main = by_name["refs/heads/main"];
3244 assert!(!main.expected_missing);
3245 assert_eq!(hex::encode(main.expected_target_oid.as_ref()), remote_oid);
3246
3247 let feature = by_name["refs/heads/feature"];
3248 assert!(
3249 feature.expected_missing,
3250 "ref absent from ListRefs is expected-missing (create)",
3251 );
3252 }
3253
3254 #[test]
3258 fn git_mirror_plan_includes_pull_request_refs() {
3259 let dir = TempDir::new().expect("tempdir");
3260 let mut git = sley::Repository::init(dir.path()).expect("init git");
3261 let pr_commit = write_commit(&git, "pr");
3262 set_ref(&mut git, "refs/pull/42/head", pr_commit);
3263
3264 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3265 .expect("build mirror plan");
3266 let updates = mirror_ref_updates(&plan);
3267 let pull = updates
3268 .iter()
3269 .find(|update| update.name == "refs/pull/42/head")
3270 .expect("refs/pull/* ref must be mirrored");
3271 assert_eq!(
3272 pull.kind,
3273 GrpcGitRefKind::Other as i32,
3274 "refs/pull/* classifies as Other",
3275 );
3276 assert_eq!(pull.target_oid.as_ref(), pr_commit.as_bytes());
3277 assert!(
3278 pull.checkpoint.is_none(),
3279 "mirror ref updates are checkpoint-less",
3280 );
3281 assert!(pull.peeled_oid.is_empty(), "a commit ref is not peeled");
3282 }
3283
3284 #[test]
3289 fn git_mirror_plan_excludes_local_only_refs_and_keeps_content() {
3290 let dir = TempDir::new().expect("tempdir");
3291 let mut git = sley::Repository::init(dir.path()).expect("init git");
3292
3293 let main_commit = write_commit(&git, "main");
3294 let notes_commit = write_commit(&git, "notes");
3295 let stash_commit = write_commit(&git, "stash");
3296 let remote_commit = write_commit(&git, "remote");
3297 let original_commit = write_commit(&git, "original");
3298 let replace_commit = write_commit(&git, "replace");
3299
3300 set_ref(&mut git, "refs/heads/main", main_commit);
3302 set_ref(&mut git, "refs/notes/heddle", notes_commit);
3303
3304 set_ref(&mut git, "refs/stash", stash_commit);
3306 set_ref(&mut git, "refs/remotes/origin/main", remote_commit);
3307 set_ref(&mut git, "refs/original/refs/heads/main", original_commit);
3308 set_ref(&mut git, "refs/replace/deadbeef", replace_commit);
3309
3310 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3311 .expect("build mirror plan");
3312 let names: Vec<&str> = plan
3313 .ref_updates
3314 .iter()
3315 .map(|m| git_ref_update_from_message(m).name.as_str())
3316 .collect();
3317
3318 assert!(
3319 names.contains(&"refs/heads/main"),
3320 "content branch is mirrored: {names:?}",
3321 );
3322 assert!(
3323 names.contains(&"refs/notes/heddle"),
3324 "heddle state-note ref is mirrored: {names:?}",
3325 );
3326 for excluded in [
3327 "refs/stash",
3328 "refs/remotes/origin/main",
3329 "refs/original/refs/heads/main",
3330 "refs/replace/deadbeef",
3331 ] {
3332 assert!(
3333 !names.contains(&excluded),
3334 "local-only ref {excluded} must NOT be mirrored: {names:?}",
3335 );
3336 }
3337 assert_eq!(names.len(), 2, "exactly the two content refs ship: {names:?}");
3338 }
3339
3340 #[test]
3345 fn git_mirror_plan_ignores_dangling_local_only_ref() {
3346 let dir = TempDir::new().expect("tempdir");
3347 let mut git = sley::Repository::init(dir.path()).expect("init git");
3348
3349 let main_commit = write_commit(&git, "main");
3350 set_ref(&mut git, "refs/heads/main", main_commit);
3351
3352 let missing = GitObjectId::from_hex(
3354 sley::ObjectFormat::Sha1,
3355 "0123456789abcdef0123456789abcdef01234567",
3356 )
3357 .expect("oid");
3358 set_ref(&mut git, "refs/original/refs/heads/main", missing);
3359
3360 let plan = build_git_mirror_plan_from_sley(&git, 64 * 1024, &HashMap::new())
3361 .expect("dangling local-only ref must not fail the mirror plan");
3362 let names: Vec<&str> = plan
3363 .ref_updates
3364 .iter()
3365 .map(|m| git_ref_update_from_message(m).name.as_str())
3366 .collect();
3367 assert_eq!(names, vec!["refs/heads/main"], "only content ships: {names:?}");
3368 }
3369
3370 #[test]
3371 fn git_ref_expectation_marks_missing_when_remote_has_no_git_revision() {
3372 let commit_oid = GitObjectId::from_hex(
3373 sley::ObjectFormat::Sha1,
3374 "0123456789abcdef0123456789abcdef01234567",
3375 )
3376 .expect("oid");
3377 let mut message = sample_ref_update_message(commit_oid);
3378
3379 let expectation = parse_git_ref_expectation("").expect("missing expectation");
3380 apply_git_ref_expectation_value(&mut message, &expectation).expect("apply expectation");
3381 let update = git_ref_update_from_message(&message);
3382 assert!(update.expected_missing);
3383 assert!(update.expected_target_oid.is_empty());
3384 }
3385
3386 #[test]
3387 fn git_ref_expectation_uses_remote_git_revision_oid() {
3388 let commit_oid = GitObjectId::from_hex(
3389 sley::ObjectFormat::Sha1,
3390 "0123456789abcdef0123456789abcdef01234567",
3391 )
3392 .expect("oid");
3393 let remote_oid = "89abcdef012345670123456789abcdef01234567";
3394 let mut message = sample_ref_update_message(commit_oid);
3395
3396 let expectation =
3397 parse_git_ref_expectation(&format!("git:{remote_oid}")).expect("git expectation");
3398 apply_git_ref_expectation_value(&mut message, &expectation).expect("apply expectation");
3399 let update = git_ref_update_from_message(&message);
3400 assert!(!update.expected_missing);
3401 assert_eq!(hex::encode(update.expected_target_oid.as_ref()), remote_oid);
3402 }
3403
3404 #[test]
3405 fn git_pack_stream_writer_emits_ordered_chunks() {
3406 let (tx, mut rx) = mpsc::channel(4);
3407 let mut writer = GitPackPushMessageWriter::new(
3408 tx,
3409 "git-pack:test".to_string(),
3410 vec![0x42; 20],
3411 10,
3412 4,
3413 Progress::null(),
3414 );
3415 writer.write_all(b"abcdefghij").expect("write pack bytes");
3416 writer.finish().expect("finish pack stream");
3417
3418 let mut chunks = Vec::new();
3419 while let Some(message) = rx.blocking_recv() {
3420 let Some(push_message::Body::GitLane(GitLaneTransfer {
3421 body: Some(git_lane_transfer::Body::Pack(pack)),
3422 })) = message.body
3423 else {
3424 panic!("expected git pack chunk");
3425 };
3426 chunks.push(pack);
3427 }
3428
3429 assert_eq!(chunks.len(), 3);
3430 assert_eq!(chunks[0].offset, 0);
3431 assert_eq!(chunks[0].chunk_index, 0);
3432 assert!(!chunks[0].is_final_chunk);
3433 assert_eq!(chunks[0].pack_chunk.as_ref(), b"abcd");
3434 assert_eq!(chunks[1].offset, 4);
3435 assert_eq!(chunks[1].chunk_index, 1);
3436 assert!(!chunks[1].is_final_chunk);
3437 assert_eq!(chunks[1].pack_chunk.as_ref(), b"efgh");
3438 assert_eq!(chunks[2].offset, 8);
3439 assert_eq!(chunks[2].chunk_index, 2);
3440 assert!(chunks[2].is_final_chunk);
3441 assert_eq!(chunks[2].pack_chunk.as_ref(), b"ij");
3442 assert!(
3443 chunks
3444 .iter()
3445 .all(|chunk| chunk.pack_id.as_ref() == &[0x42; 20][..])
3446 );
3447 }
3448
3449 #[derive(Default)]
3452 struct PhaseCapturingSink {
3453 phases: std::sync::Mutex<Vec<String>>,
3454 }
3455
3456 impl PhaseCapturingSink {
3457 fn phases(&self) -> Vec<String> {
3458 self.phases.lock().unwrap().clone()
3459 }
3460 }
3461
3462 impl objects::Sink for PhaseCapturingSink {
3463 fn render(&self, snap: objects::ProgressSnapshot) {
3464 self.phases.lock().unwrap().push(snap.phase);
3465 }
3466 }
3467
3468 #[test]
3472 fn git_pack_stream_reports_upload_progress() {
3473 let dir = TempDir::new().expect("tempdir");
3474 let git = sley::Repository::init(dir.path()).expect("init git");
3475 let commit_oid = write_commit(&git, "progress");
3476 let pack = build_git_lane_multi_root_pack_plan(&git, vec![commit_oid], 64)
3478 .expect("build pack plan");
3479
3480 let sink = std::sync::Arc::new(PhaseCapturingSink::default());
3481 struct Forward(std::sync::Arc<PhaseCapturingSink>);
3482 impl objects::Sink for Forward {
3483 fn render(&self, snap: objects::ProgressSnapshot) {
3484 self.0.render(snap);
3485 }
3486 }
3487 let progress = Progress::with_sink(Box::new(Forward(std::sync::Arc::clone(&sink))));
3488
3489 let (tx, mut rx) = mpsc::channel(1024);
3490 stream_git_pack_messages_blocking(tx, pack.clone(), 64, progress.clone())
3491 .expect("stream pack");
3492 while rx.blocking_recv().is_some() {}
3493
3494 let phases = sink.phases();
3495 assert!(
3496 phases.iter().any(|phase| phase.contains("uploading")),
3497 "pack streamer must drive an 'uploading' progress phase; saw {phases:?}",
3498 );
3499 let last_uploading = phases
3500 .iter()
3501 .rev()
3502 .find(|phase| phase.contains("uploading"))
3503 .cloned();
3504 assert!(
3505 last_uploading
3506 .as_deref()
3507 .is_some_and(|phase| phase.contains(&pack.pack_size.to_string())),
3508 "final uploading phase must show the full pack size ({} bytes); saw {last_uploading:?}",
3509 pack.pack_size,
3510 );
3511 }
3512
3513 #[test]
3514 fn git_pack_pull_install_state_streams_pack_into_sley_store() {
3515 let source_dir = TempDir::new().expect("source tempdir");
3516 let source = sley::Repository::init(source_dir.path()).expect("init source git");
3517 let blob_oid = source.write_blob(b"streamed pull pack\n").expect("blob");
3518 let pack = sley::plumbing::sley_odb::build_reachable_pack(
3519 source.objects().as_ref(),
3520 source.object_format(),
3521 [blob_oid],
3522 &HashSet::new(),
3523 )
3524 .expect("build pack")
3525 .expect("reachable pack");
3526
3527 let dest_dir = TempDir::new().expect("dest tempdir");
3528 let dest = sley::Repository::init(dest_dir.path()).expect("init dest git");
3529 let mut state = GitPackPullInstallState::default();
3530 let chunk_size = 7usize;
3531 let mut offset = 0u64;
3532 for (chunk_index, chunk) in pack.pack.chunks(chunk_size).enumerate() {
3533 let next_offset = offset + chunk.len() as u64;
3534 state
3535 .receive_chunk(
3536 &dest,
3537 GitPackTransfer {
3538 transfer_id: "git-pack:test".to_string(),
3539 offset,
3540 chunk_index: chunk_index as u32,
3541 is_final_chunk: next_offset == pack.pack.len() as u64,
3542 pack_size: pack.pack.len() as u64,
3543 pack_chunk: chunk.to_vec().into(),
3544 pack_id: pack.checksum.as_bytes().to_vec().into(),
3545 },
3546 )
3547 .expect("receive chunk");
3548 offset = next_offset;
3549 }
3550
3551 state.ensure_idle().expect("stream should finish");
3552 let object = dest.read_object(&blob_oid).expect("read installed object");
3553 assert_eq!(object.body.as_slice(), b"streamed pull pack\n");
3554 }
3555
3556 #[test]
3557 fn accept_git_lane_pull_transfer_errors_for_non_overlay_repo() {
3558 let (_dir, repo) = temp_repo();
3559 assert_ne!(
3560 repo.capability(),
3561 RepositoryCapability::GitOverlay,
3562 "init_default repo must be non-GitOverlay for this guard test",
3563 );
3564 let mut git_repo = None;
3565 let mut state = GitPackPullInstallState::default();
3566 let transfer = GitLaneTransfer {
3567 body: Some(git_lane_transfer::Body::Pack(GitPackTransfer {
3568 transfer_id: "git-pack:test".to_string(),
3569 offset: 0,
3570 chunk_index: 0,
3571 is_final_chunk: true,
3572 pack_size: 0,
3573 pack_chunk: Vec::new().into(),
3574 pack_id: Vec::new().into(),
3575 })),
3576 };
3577 let err = accept_git_lane_pull_transfer(&repo, &mut git_repo, &mut state, transfer)
3578 .expect_err("git-lane pull to a non-GitOverlay repo must fail loud, not silently drop");
3579 assert!(
3580 matches!(err, ProtocolError::InvalidState(_)),
3581 "expected InvalidState protocol error, got {err:?}",
3582 );
3583 }
3584
3585 fn git_ref_update_from_message(message: &PushMessage) -> &GitRefUpdateTransfer {
3586 let Some(push_message::Body::GitLane(GitLaneTransfer {
3587 body: Some(git_lane_transfer::Body::RefUpdate(update)),
3588 })) = message.body.as_ref()
3589 else {
3590 panic!("expected git ref update message");
3591 };
3592 update
3593 }
3594
3595 fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
3596 let hex = hash.to_hex();
3597 let (prefix, rest) = hex.split_at(2);
3598 repo.heddle_dir()
3599 .join("objects")
3600 .join("trees")
3601 .join(prefix)
3602 .join(rest)
3603 }
3604
3605 fn sample_redaction(blob: ContentHash) -> Redaction {
3606 Redaction {
3607 redacted_blob: blob,
3608 state: ChangeId::from_bytes([1u8; 16]),
3609 path: "config/secrets.toml".into(),
3610 reason: "leaked credential".into(),
3611 redactor: Principal {
3612 name: "Grace Hopper".into(),
3613 email: "grace@example.com".into(),
3614 },
3615 redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
3616 signature: None,
3617 purged_at: None,
3618 supersedes: None,
3619 }
3620 }
3621
3622 fn sample_state_visibility(state: ChangeId) -> StateVisibility {
3623 StateVisibility {
3624 state,
3625 tier: VisibilityTier::Restricted {
3626 scope_label: "security-embargo".into(),
3627 },
3628 embargo_until: None,
3629 declarer: Principal {
3630 name: "Grace Hopper".into(),
3631 email: "grace@example.com".into(),
3632 },
3633 declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
3634 signature: None,
3635 supersedes: None,
3636 }
3637 }
3638
3639 #[test]
3640 fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
3641 for obj_type in wire::native_pack_excluded_object_types() {
3642 assert!(
3643 wire::TransferPartitions::<ObjectInfo>::is_sidecar_object_type(*obj_type),
3644 "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
3645 );
3646 }
3647 }
3648
3649 #[test]
3650 fn native_pack_required_tracks_packable_pull_wants() {
3651 let blob = sample_blob();
3652 let state = ChangeId::from_bytes([9u8; 16]);
3653
3654 let sidecar_only = RepositoryTransferPlan::from_object_infos(
3655 vec![state_visibility_info(state)],
3656 GitLaneTransferIntent::HeddleObjectsOnly,
3657 );
3658 assert!(!native_pack_required_for_pull(false, &sidecar_only));
3659
3660 let redaction_only = RepositoryTransferPlan::from_object_infos(
3661 vec![redaction_info(blob)],
3662 GitLaneTransferIntent::HeddleObjectsOnly,
3663 );
3664 assert!(!native_pack_required_for_pull(false, &redaction_only));
3665
3666 let packable = RepositoryTransferPlan::from_object_infos(
3667 vec![ObjectInfo {
3668 id: ObjectId::Hash(blob),
3669 obj_type: ObjectType::Blob,
3670 size: 0,
3671 delta_base: None,
3672 }],
3673 GitLaneTransferIntent::HeddleObjectsOnly,
3674 );
3675 assert!(native_pack_required_for_pull(false, &packable));
3676
3677 let state_with_sidecar = RepositoryTransferPlan::from_object_infos(
3678 vec![state_info(state), state_visibility_info(state)],
3679 GitLaneTransferIntent::HeddleObjectsOnly,
3680 );
3681 assert!(native_pack_required_for_pull(false, &state_with_sidecar));
3682 let empty = RepositoryTransferPlan::from_object_infos(
3683 Vec::<ObjectInfo>::new(),
3684 GitLaneTransferIntent::HeddleObjectsOnly,
3685 );
3686 assert!(native_pack_required_for_pull(true, &empty));
3687 }
3688
3689 #[test]
3690 fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
3691 let (_dir, repo) = temp_repo();
3692 let state = ChangeId::from_bytes([9u8; 16]);
3693 let plan = plan_pull_wants(
3694 &repo,
3695 &state,
3696 false,
3697 vec![
3698 object_descriptor_with_status(
3699 &state_info(state),
3700 ObjectAvailabilityStatus::Missing,
3701 "missing state",
3702 ),
3703 object_descriptor_with_status(
3704 &state_visibility_info(state),
3705 ObjectAvailabilityStatus::Missing,
3706 "missing state visibility",
3707 ),
3708 ],
3709 false,
3710 )
3711 .expect("plan pull wants");
3712
3713 let wanted = plan
3714 .wanted_types
3715 .get(&PackObjectId::ChangeId(state))
3716 .expect("same ChangeId want entry");
3717 assert_eq!(
3718 wanted.as_slice(),
3719 &[ObjectType::State, ObjectType::StateVisibility]
3720 );
3721 assert!(native_pack_required_for_pull(
3722 plan.want_full_closure,
3723 &plan.transfer_plan
3724 ));
3725 }
3726
3727 #[derive(Clone)]
3728 struct SidecarOnlyPullService {
3729 state: ChangeId,
3730 state_visibility_blob: Vec<u8>,
3731 }
3732
3733 #[tonic::async_trait]
3734 impl RepoSyncService for SidecarOnlyPullService {
3735 async fn list_refs(
3736 &self,
3737 _request: tonic::Request<ListRefsRequest>,
3738 ) -> Result<Response<ListRefsResponse>, Status> {
3739 Ok(Response::new(ListRefsResponse::default()))
3740 }
3741
3742 async fn update_ref(
3743 &self,
3744 _request: tonic::Request<UpdateRefRequest>,
3745 ) -> Result<Response<UpdateRefResponse>, Status> {
3746 Ok(Response::new(UpdateRefResponse::default()))
3747 }
3748
3749 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3750
3751 async fn push(
3752 &self,
3753 _request: tonic::Request<tonic::Streaming<PushMessage>>,
3754 ) -> Result<Response<Self::PushStream>, Status> {
3755 let (_tx, rx) = mpsc::channel(1);
3756 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3757 rx,
3758 )))
3759 }
3760
3761 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3762
3763 async fn pull(
3764 &self,
3765 request: tonic::Request<tonic::Streaming<PullMessage>>,
3766 ) -> Result<Response<Self::PullStream>, Status> {
3767 let state = self.state;
3768 let state_visibility_blob = self.state_visibility_blob.clone();
3769 let (tx, rx) = mpsc::channel(4);
3770
3771 tokio::spawn(async move {
3772 let mut inbound = request.into_inner();
3773 match inbound.message().await {
3774 Ok(Some(PullMessage {
3775 body: Some(pull_message::Body::Request(_)),
3776 })) => {}
3777 other => {
3778 let _ = tx
3779 .send(Err(Status::invalid_argument(format!(
3780 "expected pull request, got {other:?}"
3781 ))))
3782 .await;
3783 return;
3784 }
3785 }
3786
3787 let descriptor = object_descriptor_with_status(
3788 &state_visibility_info(state),
3789 ObjectAvailabilityStatus::Missing,
3790 "missing state visibility",
3791 );
3792 let ready = PullMessage {
3793 body: Some(pull_message::Body::Ready(PullReady {
3794 remote_state: state.to_string_full(),
3795 objects_to_fetch: vec![descriptor],
3796 transfer: None,
3797 partial_fetch_status: PartialFetchStatus::Disabled as i32,
3798 missing_objects: Vec::new(),
3799 full_closure_available: false,
3800 object_count: 1,
3801 remote_revision_address: RevisionAddress::heddle(state).to_string(),
3802 })),
3803 };
3804 if tx.send(Ok(ready)).await.is_err() {
3805 return;
3806 }
3807
3808 match inbound.message().await {
3809 Ok(Some(PullMessage {
3810 body: Some(pull_message::Body::Want(want)),
3811 })) if !want.want_full_closure
3812 && want.objects.len() == 1
3813 && want.objects[0].object_type == "state_visibility" => {}
3814 other => {
3815 let _ = tx
3816 .send(Err(Status::invalid_argument(format!(
3817 "expected sidecar-only want, got {other:?}"
3818 ))))
3819 .await;
3820 return;
3821 }
3822 }
3823
3824 let transfer = PullMessage {
3825 body: Some(pull_message::Body::StateVisibility(
3826 StateVisibilityTransfer {
3827 state_id: state.to_string_full(),
3828 state_visibility_blob: state_visibility_blob.into(),
3829 },
3830 )),
3831 };
3832 if tx.send(Ok(transfer)).await.is_err() {
3833 return;
3834 }
3835
3836 let complete = PullMessage {
3837 body: Some(pull_message::Body::Complete(GrpcPullComplete {
3838 success: true,
3839 new_state: state.to_string_full(),
3840 error: String::new(),
3841 transfer: Some(TransferCheckpoint {
3842 transfer_id: "sidecar-only-test".to_string(),
3843 transport_mode: TransportMode::NativePack as i32,
3844 resume_offset: 0,
3845 chunk_index: 0,
3846 checkpoint: b"heddle-markers-v1\n".to_vec(),
3847 is_complete: true,
3848 }),
3849 new_revision_address: RevisionAddress::heddle(state).to_string(),
3850 })),
3851 };
3852 let _ = tx.send(Ok(complete)).await;
3853 });
3854
3855 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3856 rx,
3857 )))
3858 }
3859 }
3860
3861 async fn connect_sidecar_only_service(
3862 service: SidecarOnlyPullService,
3863 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3864 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3865 Ok(listener) => listener,
3866 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3867 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3868 return None;
3869 }
3870 Err(err) => panic!("bind test server: {err}"),
3871 };
3872 let addr = listener.local_addr().expect("local addr");
3873 let incoming = futures::stream::unfold(listener, |listener| async {
3874 match listener.accept().await {
3875 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3876 Err(err) => Some((Err(err), listener)),
3877 }
3878 });
3879
3880 let handle = tokio::spawn(async move {
3881 Server::builder()
3882 .add_service(RepoSyncServiceServer::new(service))
3883 .serve_with_incoming(incoming)
3884 .await
3885 .expect("serve sidecar-only test service");
3886 });
3887
3888 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3889 .await
3890 .expect("connect client");
3891 Some((client, handle))
3892 }
3893
3894 #[tokio::test]
3895 async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
3896 let (_dir, repo) = temp_repo();
3897 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3898 let state = State::new_snapshot(
3899 tree_hash,
3900 vec![],
3901 Attribution::human(Principal {
3902 name: "Grace Hopper".into(),
3903 email: "grace@example.com".into(),
3904 }),
3905 );
3906 let state_id = state.change_id;
3907 repo.store().put_state(&state).expect("put state");
3908 assert!(
3909 repo.get_state_visibility_bytes_for_state(&state_id)
3910 .expect("load local sidecar")
3911 .is_none(),
3912 "test starts with state present and StateVisibility sidecar absent"
3913 );
3914
3915 let state_visibility_blob =
3916 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3917 .encode()
3918 .expect("encode state visibility blob");
3919 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3920 state: state_id,
3921 state_visibility_blob,
3922 })
3923 .await
3924 else {
3925 return;
3926 };
3927
3928 let exchange = tokio::time::timeout(
3929 Duration::from_secs(5),
3930 client.pull_exchange(
3931 &repo,
3932 "owner/repo",
3933 "main",
3934 PullOptions {
3935 local_thread: None,
3936 depth: None,
3937 target_state: Some(state_id),
3938 materialization: PullMaterialization::Full,
3939 },
3940 ),
3941 )
3942 .await
3943 .expect("sidecar-only pull must not hang waiting for native pack")
3944 .expect("sidecar-only pull succeeds");
3945 server.abort();
3946
3947 assert!(exchange.result.success);
3948 assert_eq!(exchange.object_count, 0);
3949 assert_eq!(exchange.profile.pack_bytes_received, 0);
3950 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3951 assert!(
3952 repo.get_state_visibility_for_state(&state_id)
3953 .expect("load accepted sidecar")
3954 .has_record(),
3955 "pull must accept the out-of-pack StateVisibility sidecar"
3956 );
3957 }
3958
3959 #[tokio::test]
3963 async fn legitimate_large_sidecar_blob_decodes_and_installs() {
3964 let (_dir, repo) = temp_repo();
3965 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3966 let state = State::new_snapshot(
3967 tree_hash,
3968 vec![],
3969 Attribution::human(Principal {
3970 name: "Grace Hopper".into(),
3971 email: "grace@example.com".into(),
3972 }),
3973 );
3974 let state_id = state.change_id;
3975 repo.store().put_state(&state).expect("put state");
3976
3977 let mut record = sample_state_visibility(state_id);
3980 record.tier = VisibilityTier::Restricted {
3981 scope_label: "x".repeat(8 * 1024 * 1024),
3982 };
3983 let state_visibility_blob = StateVisibilityBlob::new(vec![record])
3984 .encode()
3985 .expect("encode large state visibility blob");
3986 assert!(
3987 state_visibility_blob.len() > 4 * 1024 * 1024,
3988 "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
3989 );
3990 assert!(
3991 (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
3992 "blob must stay within the legitimate sidecar receive cap"
3993 );
3994
3995 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3996 state: state_id,
3997 state_visibility_blob,
3998 })
3999 .await
4000 else {
4001 return;
4002 };
4003
4004 let exchange = tokio::time::timeout(
4005 Duration::from_secs(30),
4006 client.pull_exchange(
4007 &repo,
4008 "owner/repo",
4009 "main",
4010 PullOptions {
4011 local_thread: None,
4012 depth: None,
4013 target_state: Some(state_id),
4014 materialization: PullMaterialization::Full,
4015 },
4016 ),
4017 )
4018 .await
4019 .expect("large-sidecar pull must not hang")
4020 .expect("large but legitimate sidecar pull succeeds");
4021 server.abort();
4022
4023 assert!(exchange.result.success);
4024 assert!(
4025 repo.get_state_visibility_for_state(&state_id)
4026 .expect("load accepted sidecar")
4027 .has_record(),
4028 "pull must accept a legitimately-large StateVisibility sidecar"
4029 );
4030 }
4031
4032 #[tokio::test]
4036 async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
4037 let (_dir, repo) = temp_repo();
4038 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
4039 let state = State::new_snapshot(
4040 tree_hash,
4041 vec![],
4042 Attribution::human(Principal {
4043 name: "Grace Hopper".into(),
4044 email: "grace@example.com".into(),
4045 }),
4046 );
4047 let state_id = state.change_id;
4048 repo.store().put_state(&state).expect("put state");
4049
4050 let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
4053 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
4054 state: state_id,
4055 state_visibility_blob: oversized,
4056 })
4057 .await
4058 else {
4059 return;
4060 };
4061
4062 let result = tokio::time::timeout(
4063 Duration::from_secs(30),
4064 client.pull_exchange(
4065 &repo,
4066 "owner/repo",
4067 "main",
4068 PullOptions {
4069 local_thread: None,
4070 depth: None,
4071 target_state: Some(state_id),
4072 materialization: PullMaterialization::Full,
4073 },
4074 ),
4075 )
4076 .await
4077 .expect("oversized-sidecar pull must not hang");
4078 server.abort();
4079
4080 let err = match result {
4081 Err(err) => err,
4082 Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
4083 };
4084 let message = err.to_string();
4085 assert!(
4086 !message.contains("exceeds receive size limit"),
4087 "rejection must come from the decode-size limit, before the post-decode check: {message}"
4088 );
4089 assert!(
4090 repo.get_state_visibility_for_state(&state_id)
4091 .expect("load sidecar")
4092 .latest()
4093 .expect("resolve visibility")
4094 .is_none(),
4095 "an oversized sidecar must never be installed"
4096 );
4097 }
4098
4099 #[derive(Clone)]
4100 struct StateAndVisibilityPullService {
4101 state: ChangeId,
4102 pack_bundle: wire::NativePackBundle,
4103 state_visibility_blob: Vec<u8>,
4104 }
4105
4106 #[tonic::async_trait]
4107 impl RepoSyncService for StateAndVisibilityPullService {
4108 async fn list_refs(
4109 &self,
4110 _request: tonic::Request<ListRefsRequest>,
4111 ) -> Result<Response<ListRefsResponse>, Status> {
4112 Ok(Response::new(ListRefsResponse::default()))
4113 }
4114
4115 async fn update_ref(
4116 &self,
4117 _request: tonic::Request<UpdateRefRequest>,
4118 ) -> Result<Response<UpdateRefResponse>, Status> {
4119 Ok(Response::new(UpdateRefResponse::default()))
4120 }
4121
4122 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4123
4124 async fn push(
4125 &self,
4126 _request: tonic::Request<tonic::Streaming<PushMessage>>,
4127 ) -> Result<Response<Self::PushStream>, Status> {
4128 let (_tx, rx) = mpsc::channel(1);
4129 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4130 rx,
4131 )))
4132 }
4133
4134 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4135
4136 async fn pull(
4137 &self,
4138 request: tonic::Request<tonic::Streaming<PullMessage>>,
4139 ) -> Result<Response<Self::PullStream>, Status> {
4140 let state = self.state;
4141 let pack_bundle = self.pack_bundle.clone();
4142 let state_visibility_blob = self.state_visibility_blob.clone();
4143 let (tx, rx) = mpsc::channel(8);
4144
4145 tokio::spawn(async move {
4146 let mut inbound = request.into_inner();
4147 match inbound.message().await {
4148 Ok(Some(PullMessage {
4149 body: Some(pull_message::Body::Request(_)),
4150 })) => {}
4151 other => {
4152 let _ = tx
4153 .send(Err(Status::invalid_argument(format!(
4154 "expected pull request, got {other:?}"
4155 ))))
4156 .await;
4157 return;
4158 }
4159 }
4160
4161 let ready = PullMessage {
4162 body: Some(pull_message::Body::Ready(PullReady {
4163 remote_state: state.to_string_full(),
4164 objects_to_fetch: vec![
4165 object_descriptor_with_status(
4166 &state_info(state),
4167 ObjectAvailabilityStatus::Missing,
4168 "missing state",
4169 ),
4170 object_descriptor_with_status(
4171 &state_visibility_info(state),
4172 ObjectAvailabilityStatus::Missing,
4173 "missing state visibility",
4174 ),
4175 ],
4176 transfer: None,
4177 partial_fetch_status: PartialFetchStatus::Disabled as i32,
4178 missing_objects: Vec::new(),
4179 full_closure_available: false,
4180 object_count: 2,
4181 remote_revision_address: RevisionAddress::heddle(state).to_string(),
4182 })),
4183 };
4184 if tx.send(Ok(ready)).await.is_err() {
4185 return;
4186 }
4187
4188 match inbound.message().await {
4189 Ok(Some(PullMessage {
4190 body: Some(pull_message::Body::Want(want)),
4191 })) if !want.want_full_closure
4192 && want.objects.len() == 2
4193 && want
4194 .objects
4195 .iter()
4196 .any(|object| object.object_type == "state")
4197 && want
4198 .objects
4199 .iter()
4200 .any(|object| object.object_type == "state_visibility") => {}
4201 other => {
4202 let _ = tx
4203 .send(Err(Status::invalid_argument(format!(
4204 "expected state + sidecar wants, got {other:?}"
4205 ))))
4206 .await;
4207 return;
4208 }
4209 }
4210
4211 for message in
4212 encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
4213 {
4214 if tx.send(Ok(message)).await.is_err() {
4215 return;
4216 }
4217 }
4218
4219 let transfer = PullMessage {
4220 body: Some(pull_message::Body::StateVisibility(
4221 StateVisibilityTransfer {
4222 state_id: state.to_string_full(),
4223 state_visibility_blob: state_visibility_blob.into(),
4224 },
4225 )),
4226 };
4227 if tx.send(Ok(transfer)).await.is_err() {
4228 return;
4229 }
4230
4231 let complete = PullMessage {
4232 body: Some(pull_message::Body::Complete(GrpcPullComplete {
4233 success: true,
4234 new_state: state.to_string_full(),
4235 error: String::new(),
4236 transfer: Some(TransferCheckpoint {
4237 transfer_id: "state-and-visibility-test".to_string(),
4238 transport_mode: TransportMode::NativePack as i32,
4239 resume_offset: 0,
4240 chunk_index: 0,
4241 checkpoint: b"heddle-markers-v1\n".to_vec(),
4242 is_complete: true,
4243 }),
4244 new_revision_address: RevisionAddress::heddle(state).to_string(),
4245 })),
4246 };
4247 let _ = tx.send(Ok(complete)).await;
4248 });
4249
4250 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4251 rx,
4252 )))
4253 }
4254 }
4255
4256 fn encode_pull_native_pack_messages(
4257 bundle: &wire::NativePackBundle,
4258 transfer_id: &str,
4259 chunk_size: usize,
4260 ) -> Vec<PullMessage> {
4261 let mut messages = Vec::new();
4262 let chunk_size = chunk_size.max(1);
4263
4264 let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
4265 for chunk_index in 0..pack_total_chunks.max(1) {
4266 let Some((start, len)) =
4267 wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
4268 else {
4269 break;
4270 };
4271 messages.push(PullMessage {
4272 body: Some(pull_message::Body::Pack(PackChunk {
4273 stream_kind: PackStreamKind::Pack as i32,
4274 data: bundle.pack_data[start..start + len].to_vec().into(),
4275 transfer: Some(TransferCheckpoint {
4276 transfer_id: transfer_id.to_string(),
4277 transport_mode: TransportMode::NativePack as i32,
4278 resume_offset: start as u64,
4279 chunk_index: chunk_index as u32,
4280 checkpoint: Vec::new(),
4281 is_complete: chunk_index + 1 == pack_total_chunks,
4282 }),
4283 chunk_length: len as u32,
4284 is_final_chunk: chunk_index + 1 == pack_total_chunks,
4285 })),
4286 });
4287 }
4288
4289 let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
4290 for chunk_index in 0..index_total_chunks.max(1) {
4291 let Some((start, len)) =
4292 wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
4293 else {
4294 break;
4295 };
4296 messages.push(PullMessage {
4297 body: Some(pull_message::Body::Pack(PackChunk {
4298 stream_kind: PackStreamKind::Index as i32,
4299 data: bundle.index_data[start..start + len].to_vec().into(),
4300 transfer: Some(TransferCheckpoint {
4301 transfer_id: transfer_id.to_string(),
4302 transport_mode: TransportMode::NativePack as i32,
4303 resume_offset: start as u64,
4304 chunk_index: chunk_index as u32,
4305 checkpoint: Vec::new(),
4306 is_complete: chunk_index + 1 == index_total_chunks,
4307 }),
4308 chunk_length: len as u32,
4309 is_final_chunk: chunk_index + 1 == index_total_chunks,
4310 })),
4311 });
4312 }
4313
4314 messages
4315 }
4316
4317 async fn connect_state_and_visibility_service(
4318 service: StateAndVisibilityPullService,
4319 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4320 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4321 Ok(listener) => listener,
4322 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4323 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4324 return None;
4325 }
4326 Err(err) => panic!("bind test server: {err}"),
4327 };
4328 let addr = listener.local_addr().expect("local addr");
4329 let incoming = futures::stream::unfold(listener, |listener| async {
4330 match listener.accept().await {
4331 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4332 Err(err) => Some((Err(err), listener)),
4333 }
4334 });
4335
4336 let handle = tokio::spawn(async move {
4337 Server::builder()
4338 .add_service(RepoSyncServiceServer::new(service))
4339 .serve_with_incoming(incoming)
4340 .await
4341 .expect("serve state-and-visibility test service");
4342 });
4343
4344 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4345 .await
4346 .expect("connect client");
4347 Some((client, handle))
4348 }
4349
4350 #[tokio::test]
4351 async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
4352 let (_source_dir, source_repo) = temp_repo();
4353 let (_target_dir, target_repo) = temp_repo();
4354 let tree_hash = source_repo
4355 .store()
4356 .put_tree(&Tree::new())
4357 .expect("put tree");
4358 let state = State::new_snapshot(
4359 tree_hash,
4360 vec![],
4361 Attribution::human(Principal {
4362 name: "Grace Hopper".into(),
4363 email: "grace@example.com".into(),
4364 }),
4365 );
4366 let state_id = state.change_id;
4367 source_repo
4368 .store()
4369 .put_state(&state)
4370 .expect("put source state");
4371 let state_visibility_blob =
4372 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
4373 .encode()
4374 .expect("encode state visibility blob");
4375 source_repo
4376 .accept_wire_state_visibility(state_id, &state_visibility_blob)
4377 .expect("put source state visibility");
4378 let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
4379 .expect("build state pack");
4380
4381 assert!(
4382 target_repo
4383 .store()
4384 .get_state(&state_id)
4385 .expect("load target state")
4386 .is_none(),
4387 "test starts with state absent"
4388 );
4389 assert!(
4390 target_repo
4391 .get_state_visibility_bytes_for_state(&state_id)
4392 .expect("load target sidecar")
4393 .is_none(),
4394 "test starts with StateVisibility sidecar absent"
4395 );
4396
4397 let Some((mut client, server)) =
4398 connect_state_and_visibility_service(StateAndVisibilityPullService {
4399 state: state_id,
4400 pack_bundle,
4401 state_visibility_blob,
4402 })
4403 .await
4404 else {
4405 return;
4406 };
4407
4408 let exchange = tokio::time::timeout(
4409 Duration::from_secs(5),
4410 client.pull_exchange(
4411 &target_repo,
4412 "owner/repo",
4413 "main",
4414 PullOptions {
4415 local_thread: None,
4416 depth: None,
4417 target_state: Some(state_id),
4418 materialization: PullMaterialization::Full,
4419 },
4420 ),
4421 )
4422 .await
4423 .expect("state + sidecar pull must not hang waiting for native pack")
4424 .expect("state + sidecar pull succeeds");
4425 server.abort();
4426
4427 assert!(exchange.result.success);
4428 assert_eq!(exchange.object_count, 1);
4429 assert!(exchange.profile.pack_bytes_received > 0);
4430 assert_eq!(exchange.profile.object_mix.states, 1);
4431 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
4432 assert!(
4433 target_repo
4434 .store()
4435 .get_state(&state_id)
4436 .expect("load installed state")
4437 .is_some(),
4438 "native pack must install the State"
4439 );
4440 assert!(
4441 target_repo
4442 .get_state_visibility_for_state(&state_id)
4443 .expect("load accepted sidecar")
4444 .has_record(),
4445 "pull must accept the out-of-pack StateVisibility sidecar"
4446 );
4447 }
4448
4449 #[test]
4450 fn missing_blobs_in_tree_treats_absent_tree_as_empty() {
4451 let (_dir, repo) = temp_repo();
4452 let absent_tree = ContentHash::from_bytes([99u8; 32]);
4453
4454 let missing = wire::missing_blobs_in_tree(repo.store(), absent_tree)
4455 .expect("absent tree is not an error");
4456
4457 assert!(missing.is_empty());
4458 }
4459
4460 #[test]
4461 fn missing_blobs_in_tree_reports_only_genuinely_missing_blobs() {
4462 let (_dir, repo) = temp_repo();
4463 let present_blob = Blob::from("already local");
4464 let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
4465 let missing_hash = ContentHash::from_bytes([42u8; 32]);
4466 let tree = Tree::from_entries(vec![
4467 TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
4468 TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
4469 ]);
4470 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4471
4472 let missing =
4473 wire::missing_blobs_in_tree(repo.store(), tree_hash).expect("collect missing blobs");
4474
4475 assert_eq!(missing, vec![missing_hash]);
4476 }
4477
4478 #[test]
4479 fn missing_blobs_in_tree_reports_corrupt_tree_read() {
4480 let (_dir, repo) = temp_repo();
4481 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
4482 std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
4483 repo.store().clear_recent_caches();
4484
4485 let err = wire::missing_blobs_in_tree(repo.store(), tree_hash)
4486 .expect_err("corrupt tree must fail");
4487
4488 assert!(matches!(err, ProtocolError::InvalidState(_)));
4489 assert!(
4490 err.to_string().contains(&format!(
4491 "load tree {} while collecting lazy hydration missing blobs",
4492 tree_hash.to_hex()
4493 )),
4494 "unexpected error: {err}"
4495 );
4496 }
4497
4498 #[test]
4499 fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
4500 let (_dir, repo) = temp_repo();
4501 let blob = sample_blob();
4502 repo.put_redaction(sample_redaction(blob))
4503 .expect("put redaction");
4504 let expected_bytes = repo
4505 .store()
4506 .get_redactions_bytes_for_blob(&blob)
4507 .expect("load sidecar")
4508 .expect("sidecar exists");
4509
4510 let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
4511
4512 let Some(push_message::Body::Redaction(transfer)) = message.body else {
4513 panic!("expected redaction transfer");
4514 };
4515 assert_eq!(transfer.blob_hash, blob.to_hex());
4516 assert_eq!(transfer.redactions_blob, expected_bytes);
4517 }
4518
4519 #[test]
4520 fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
4521 let (_dir, repo) = temp_repo();
4522 let blob = sample_blob();
4523
4524 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
4525
4526 assert!(matches!(err, ProtocolError::InvalidState(_)));
4527 assert!(
4528 err.to_string().contains(&format!(
4529 "server wants redaction for blob {} but sender has no sidecar",
4530 blob.to_hex()
4531 )),
4532 "unexpected error: {err}"
4533 );
4534 }
4535
4536 #[test]
4537 fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
4538 let (_dir, repo) = temp_repo();
4539 let blob = sample_blob();
4540 let redaction_path = repo
4541 .heddle_dir()
4542 .join("redactions")
4543 .join(format!("{}.bin", blob.to_hex()));
4544 std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
4545
4546 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
4547
4548 assert!(matches!(err, ProtocolError::InvalidState(_)));
4549 assert!(
4550 err.to_string()
4551 .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
4552 "unexpected error: {err}"
4553 );
4554 }
4555
4556 #[test]
4557 fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
4558 let (_dir, repo) = temp_repo();
4559 let state = ChangeId::from_bytes([17u8; 16]);
4560 repo.put_state_visibility(sample_state_visibility(state))
4561 .expect("put state visibility");
4562 let expected_bytes = repo
4563 .get_state_visibility_bytes_for_state(&state)
4564 .expect("load sidecar")
4565 .expect("sidecar exists");
4566
4567 let message =
4568 state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
4569
4570 let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
4571 panic!("expected state visibility transfer");
4572 };
4573 assert_eq!(transfer.state_id, state.to_string_full());
4574 assert_eq!(transfer.state_visibility_blob, expected_bytes);
4575 }
4576
4577 fn sample_attribution() -> Attribution {
4580 Attribution::human(Principal {
4581 name: "Grace Hopper".into(),
4582 email: "grace@example.com".into(),
4583 })
4584 }
4585
4586 fn put_state_with_blob(repo: &Repository, contents: &str, parents: Vec<ChangeId>) -> ChangeId {
4589 let blob = Blob::from(contents);
4590 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4591 let tree = Tree::from_entries(vec![
4592 TreeEntry::file(format!("{contents}.txt"), blob_hash, false).expect("tree entry"),
4593 ]);
4594 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4595 let state = State::new_snapshot(tree_hash, parents, sample_attribution());
4596 let state_id = state.change_id;
4597 repo.store().put_state(&state).expect("put state");
4598 state_id
4599 }
4600
4601 #[test]
4602 fn locally_complete_pull_head_advertises_fully_present_thread_head() {
4603 let (_dir, repo) = temp_repo();
4604 let head = put_state_with_blob(&repo, "alpha", vec![]);
4605 repo.refs()
4606 .set_thread(&ThreadName::from("main"), &head)
4607 .expect("set thread");
4608
4609 let advertised =
4610 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4611 assert_eq!(
4612 advertised,
4613 Some(head),
4614 "a thread head whose full closure is present locally must be advertised"
4615 );
4616 }
4617
4618 #[test]
4619 fn locally_complete_pull_head_skips_unknown_thread() {
4620 let (_dir, repo) = temp_repo_unseeded();
4624 let advertised =
4625 locally_complete_pull_head(&repo, "nonexistent", None).expect("completeness check");
4626 assert_eq!(advertised, None);
4627 }
4628
4629 #[test]
4630 fn locally_complete_pull_head_skips_when_target_state_override_in_play() {
4631 let (_dir, repo) = temp_repo();
4632 let head = put_state_with_blob(&repo, "alpha", vec![]);
4633 repo.refs()
4634 .set_thread(&ThreadName::from("main"), &head)
4635 .expect("set thread");
4636
4637 let advertised =
4640 locally_complete_pull_head(&repo, "main", Some(head)).expect("completeness check");
4641 assert_eq!(advertised, None);
4642 }
4643
4644 #[test]
4645 fn locally_complete_pull_head_skips_repo_with_missing_blobs() {
4646 let (_dir, repo) = temp_repo();
4651 let head = put_state_with_blob(&repo, "alpha", vec![]);
4652 repo.refs()
4653 .set_thread(&ThreadName::from("main"), &head)
4654 .expect("set thread");
4655 repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4656 .expect("record missing blob");
4657
4658 let advertised =
4659 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4660 assert_eq!(
4661 advertised, None,
4662 "a repo carrying missing blobs must never advertise a head"
4663 );
4664 }
4665
4666 #[test]
4667 fn locally_complete_pull_head_skips_head_with_incomplete_closure() {
4668 let (_dir, repo) = temp_repo();
4673 let absent_parent = ChangeId::generate();
4674 let blob = Blob::from("orphan");
4675 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4676 let tree = Tree::from_entries(vec![
4677 TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4678 ]);
4679 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4680 let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4682 let child_id = child.change_id;
4683 repo.store().put_state(&child).expect("put child state");
4684 repo.refs()
4685 .set_thread(&ThreadName::from("main"), &child_id)
4686 .expect("set thread");
4687
4688 let advertised =
4689 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4690 assert_eq!(
4691 advertised, None,
4692 "a head whose closure has an absent parent state must not be advertised"
4693 );
4694 }
4695
4696 #[test]
4697 fn locally_complete_local_thread_head_advertises_fully_present_thread_head() {
4698 let (_dir, repo) = temp_repo();
4701 let head = put_state_with_blob(&repo, "alpha", vec![]);
4702 repo.refs()
4703 .set_thread(&ThreadName::from("feature"), &head)
4704 .expect("set thread");
4705
4706 let advertised =
4707 locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4708 assert_eq!(
4709 advertised,
4710 Some(head),
4711 "an explicit local-thread head whose full closure is present must be advertised"
4712 );
4713 }
4714
4715 #[test]
4716 fn locally_complete_local_thread_head_skips_unknown_thread() {
4717 let (_dir, repo) = temp_repo_unseeded();
4720 let advertised = locally_complete_local_thread_head(&repo, "nonexistent", None)
4721 .expect("completeness check");
4722 assert_eq!(advertised, None);
4723 }
4724
4725 #[test]
4726 fn locally_complete_local_thread_head_skips_when_target_state_override_in_play() {
4727 let (_dir, repo) = temp_repo();
4728 let head = put_state_with_blob(&repo, "alpha", vec![]);
4729 repo.refs()
4730 .set_thread(&ThreadName::from("feature"), &head)
4731 .expect("set thread");
4732
4733 let advertised = locally_complete_local_thread_head(&repo, "feature", Some(head))
4736 .expect("completeness check");
4737 assert_eq!(advertised, None);
4738 }
4739
4740 #[test]
4741 fn locally_complete_local_thread_head_skips_repo_with_missing_blobs() {
4742 let (_dir, repo) = temp_repo();
4745 let head = put_state_with_blob(&repo, "alpha", vec![]);
4746 repo.refs()
4747 .set_thread(&ThreadName::from("feature"), &head)
4748 .expect("set thread");
4749 repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4750 .expect("record missing blob");
4751
4752 let advertised =
4753 locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4754 assert_eq!(
4755 advertised, None,
4756 "a repo carrying missing blobs must never advertise an explicit local-thread head"
4757 );
4758 }
4759
4760 #[test]
4761 fn locally_complete_local_thread_head_skips_head_with_incomplete_closure() {
4762 let (_dir, repo) = temp_repo();
4767 let absent_parent = ChangeId::generate();
4768 let blob = Blob::from("orphan");
4769 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4770 let tree = Tree::from_entries(vec![
4771 TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4772 ]);
4773 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4774 let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4775 let child_id = child.change_id;
4776 repo.store().put_state(&child).expect("put child state");
4777 repo.refs()
4778 .set_thread(&ThreadName::from("feature"), &child_id)
4779 .expect("set thread");
4780
4781 let advertised =
4782 locally_complete_local_thread_head(&repo, "feature", None).expect("completeness check");
4783 assert_eq!(
4784 advertised, None,
4785 "an explicit local-thread head whose closure has an absent parent must not be advertised"
4786 );
4787 }
4788
4789 #[derive(Clone)]
4795 struct DeltaAwarePullService {
4796 remote_state: ChangeId,
4797 full_closure: Vec<ObjectInfo>,
4803 delta_objects: Vec<ObjectInfo>,
4804 known_parent: ChangeId,
4805 full_pack: wire::NativePackBundle,
4806 delta_pack: wire::NativePackBundle,
4807 captured_exclude: std::sync::Arc<std::sync::Mutex<Option<Vec<String>>>>,
4808 }
4809
4810 #[tonic::async_trait]
4811 impl RepoSyncService for DeltaAwarePullService {
4812 async fn list_refs(
4813 &self,
4814 _request: tonic::Request<ListRefsRequest>,
4815 ) -> Result<Response<ListRefsResponse>, Status> {
4816 Ok(Response::new(ListRefsResponse::default()))
4817 }
4818
4819 async fn update_ref(
4820 &self,
4821 _request: tonic::Request<UpdateRefRequest>,
4822 ) -> Result<Response<UpdateRefResponse>, Status> {
4823 Ok(Response::new(UpdateRefResponse::default()))
4824 }
4825
4826 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4827
4828 async fn push(
4829 &self,
4830 _request: tonic::Request<tonic::Streaming<PushMessage>>,
4831 ) -> Result<Response<Self::PushStream>, Status> {
4832 let (_tx, rx) = mpsc::channel(1);
4833 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4834 rx,
4835 )))
4836 }
4837
4838 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4839
4840 async fn pull(
4841 &self,
4842 request: tonic::Request<tonic::Streaming<PullMessage>>,
4843 ) -> Result<Response<Self::PullStream>, Status> {
4844 let svc = self.clone();
4845 let (tx, rx) = mpsc::channel(16);
4846
4847 tokio::spawn(async move {
4848 let mut inbound = request.into_inner();
4849 let exclude = match inbound.message().await {
4850 Ok(Some(PullMessage {
4851 body: Some(pull_message::Body::Request(req)),
4852 })) => req.exclude_states,
4853 other => {
4854 let _ = tx
4855 .send(Err(Status::invalid_argument(format!(
4856 "expected pull request, got {other:?}"
4857 ))))
4858 .await;
4859 return;
4860 }
4861 };
4862 *svc.captured_exclude.lock().unwrap() = Some(exclude.clone());
4863
4864 let remote_full = svc.remote_state.to_string_full();
4865 let parent_full = svc.known_parent.to_string_full();
4866
4867 let (objects, pack, short_circuit) = if exclude.contains(&remote_full) {
4869 (Vec::new(), None, true)
4871 } else if exclude.contains(&parent_full) {
4872 (
4874 svc.delta_objects.clone(),
4875 Some(svc.delta_pack.clone()),
4876 false,
4877 )
4878 } else {
4879 (svc.full_closure.clone(), Some(svc.full_pack.clone()), false)
4881 };
4882
4883 let descriptors: Vec<_> = objects
4884 .iter()
4885 .map(|info| {
4886 object_descriptor_with_status(
4887 info,
4888 ObjectAvailabilityStatus::Missing,
4889 "requested by client",
4890 )
4891 })
4892 .collect();
4893
4894 let ready = PullMessage {
4895 body: Some(pull_message::Body::Ready(PullReady {
4896 remote_state: remote_full.clone(),
4897 objects_to_fetch: descriptors,
4898 transfer: None,
4899 partial_fetch_status: PartialFetchStatus::Disabled as i32,
4900 missing_objects: Vec::new(),
4901 full_closure_available: false,
4902 object_count: objects.len() as u32,
4903 remote_revision_address: RevisionAddress::heddle(svc.remote_state)
4904 .to_string(),
4905 })),
4906 };
4907 if tx.send(Ok(ready)).await.is_err() {
4908 return;
4909 }
4910
4911 match inbound.message().await {
4913 Ok(Some(PullMessage {
4914 body: Some(pull_message::Body::Want(_)),
4915 })) => {}
4916 other => {
4917 let _ = tx
4918 .send(Err(Status::invalid_argument(format!(
4919 "expected want, got {other:?}"
4920 ))))
4921 .await;
4922 return;
4923 }
4924 }
4925
4926 if let Some(pack) = pack
4927 && !short_circuit
4928 {
4929 for message in encode_pull_native_pack_messages(&pack, "delta-aware-test", 64) {
4930 if tx.send(Ok(message)).await.is_err() {
4931 return;
4932 }
4933 }
4934 }
4935
4936 let complete = PullMessage {
4937 body: Some(pull_message::Body::Complete(GrpcPullComplete {
4938 success: true,
4939 new_state: remote_full,
4940 error: String::new(),
4941 transfer: Some(TransferCheckpoint {
4942 transfer_id: "delta-aware-test".to_string(),
4943 transport_mode: TransportMode::NativePack as i32,
4944 resume_offset: 0,
4945 chunk_index: 0,
4946 checkpoint: b"heddle-markers-v1\n".to_vec(),
4947 is_complete: true,
4948 }),
4949 new_revision_address: RevisionAddress::heddle(svc.remote_state)
4950 .to_string(),
4951 })),
4952 };
4953 let _ = tx.send(Ok(complete)).await;
4954 });
4955
4956 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4957 rx,
4958 )))
4959 }
4960 }
4961
4962 async fn connect_delta_aware_service(
4963 service: DeltaAwarePullService,
4964 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4965 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4966 Ok(listener) => listener,
4967 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4968 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4969 return None;
4970 }
4971 Err(err) => panic!("bind test server: {err}"),
4972 };
4973 let addr = listener.local_addr().expect("local addr");
4974 let incoming = futures::stream::unfold(listener, |listener| async {
4975 match listener.accept().await {
4976 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4977 Err(err) => Some((Err(err), listener)),
4978 }
4979 });
4980 let handle = tokio::spawn(async move {
4981 Server::builder()
4982 .add_service(RepoSyncServiceServer::new(service))
4983 .serve_with_incoming(incoming)
4984 .await
4985 .expect("serve delta-aware test service");
4986 });
4987 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4988 .await
4989 .expect("connect client");
4990 Some((client, handle))
4991 }
4992
4993 #[tokio::test]
4994 async fn warm_bare_pull_advertises_head_and_fires_zero_delta_short_circuit() {
4995 let (_dir, repo) = temp_repo();
4999 let head = put_state_with_blob(&repo, "tip", vec![]);
5000 repo.refs()
5001 .set_thread(&ThreadName::from("main"), &head)
5002 .expect("set thread");
5003
5004 let full_closure =
5005 wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
5006 let full_pack =
5007 wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
5008 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5009
5010 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5011 remote_state: head,
5012 full_closure,
5013 delta_objects: Vec::new(),
5014 known_parent: ChangeId::generate(),
5015 full_pack: full_pack.clone(),
5016 delta_pack: full_pack,
5017 captured_exclude: captured.clone(),
5018 })
5019 .await
5020 else {
5021 return;
5022 };
5023
5024 let exchange = tokio::time::timeout(
5025 Duration::from_secs(5),
5026 client.pull_exchange(
5027 &repo,
5028 "owner/repo",
5029 "main",
5030 PullOptions {
5031 local_thread: None,
5032 depth: None,
5033 target_state: None,
5034 materialization: PullMaterialization::Full,
5035 },
5036 ),
5037 )
5038 .await
5039 .expect("warm bare pull must not hang")
5040 .expect("warm bare pull succeeds");
5041 server.abort();
5042
5043 let advertised = captured.lock().unwrap().clone().expect("request captured");
5044 assert_eq!(
5045 advertised,
5046 vec![head.to_string_full()],
5047 "a bare pull at the tip must advertise that tip in exclude_states"
5048 );
5049 assert!(exchange.result.success);
5050 assert_eq!(
5051 exchange.object_count, 0,
5052 "the zero-delta short-circuit must transfer no objects, not the full closure"
5053 );
5054 }
5055
5056 #[tokio::test]
5057 async fn behind_client_bare_pull_receives_exactly_the_missing_delta() {
5058 let (_src_dir, src_repo) = temp_repo_unseeded();
5065 let parent = put_state_with_blob(&src_repo, "base", vec![]);
5066 let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
5067 let child_closure =
5068 wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
5069
5070 let (_dir, repo) = temp_repo_unseeded();
5074 let parent_closure =
5075 wire::enumerate_state_closure(src_repo.store(), parent).expect("parent closure");
5076 let parent_pack =
5077 wire::build_native_pack(src_repo.store(), &parent_closure).expect("parent pack");
5078 wire::install_received_pack(
5079 repo.store(),
5080 &parent_pack.pack_data,
5081 &parent_pack.index_data,
5082 )
5083 .expect("install parent closure into client");
5084 repo.refs()
5085 .set_thread(&ThreadName::from("main"), &parent)
5086 .expect("set thread to parent");
5087 wire::enumerate_state_closure(repo.store(), parent).expect("client holds parent closure");
5089 assert!(
5091 repo.store()
5092 .get_state(&child)
5093 .expect("probe child")
5094 .is_none(),
5095 "client must start without the child state"
5096 );
5097 let parent_clone = parent;
5098 let full_pack =
5099 wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
5100 let delta_objects = wire::enumerate_state_closure_with_options(
5103 src_repo.store(),
5104 child,
5105 wire::StateClosureOptions {
5106 depth: None,
5107 exclude_states: vec![parent_clone],
5108 },
5109 )
5110 .expect("delta closure");
5111 assert!(
5112 !delta_objects.is_empty() && delta_objects.len() < child_closure.len(),
5113 "delta must be a strict, non-empty subset of the full closure"
5114 );
5115 let delta_pack =
5116 wire::build_native_pack(src_repo.store(), &delta_objects).expect("delta pack");
5117
5118 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5119 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5120 remote_state: child,
5121 full_closure: child_closure.clone(),
5122 delta_objects: delta_objects.clone(),
5123 known_parent: parent_clone,
5124 full_pack,
5125 delta_pack,
5126 captured_exclude: captured.clone(),
5127 })
5128 .await
5129 else {
5130 return;
5131 };
5132
5133 let exchange = tokio::time::timeout(
5134 Duration::from_secs(5),
5135 client.pull_exchange(
5136 &repo,
5137 "owner/repo",
5138 "main",
5139 PullOptions {
5140 local_thread: None,
5141 depth: None,
5142 target_state: None,
5143 materialization: PullMaterialization::Full,
5144 },
5145 ),
5146 )
5147 .await
5148 .expect("behind-client pull must not hang")
5149 .expect("behind-client pull succeeds");
5150 server.abort();
5151
5152 let advertised = captured.lock().unwrap().clone().expect("request captured");
5153 assert_eq!(
5154 advertised,
5155 vec![parent.to_string_full()],
5156 "a behind client must advertise the parent head it holds"
5157 );
5158 assert!(exchange.result.success);
5159 let reassembled = wire::enumerate_state_closure(repo.store(), child)
5163 .expect("the client must hold the complete child closure after a delta pull");
5164 assert_eq!(
5165 reassembled.len(),
5166 child_closure.len(),
5167 "delta pull must leave the client with the full child closure, no gaps"
5168 );
5169 }
5170
5171 #[tokio::test]
5172 async fn fresh_bare_pull_advertises_nothing_and_gets_full_closure() {
5173 let (_dir, repo) = temp_repo_unseeded();
5177 let (_src_dir, src_repo) = temp_repo_unseeded();
5178 let remote = put_state_with_blob(&src_repo, "seed", vec![]);
5179 let full_closure =
5180 wire::enumerate_state_closure(src_repo.store(), remote).expect("closure");
5181 let full_pack =
5182 wire::build_native_pack(src_repo.store(), &full_closure).expect("full pack");
5183
5184 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5185 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5186 remote_state: remote,
5187 full_closure: full_closure.clone(),
5188 delta_objects: Vec::new(),
5189 known_parent: ChangeId::generate(),
5190 full_pack: full_pack.clone(),
5191 delta_pack: full_pack,
5192 captured_exclude: captured.clone(),
5193 })
5194 .await
5195 else {
5196 return;
5197 };
5198
5199 let exchange = tokio::time::timeout(
5200 Duration::from_secs(5),
5201 client.pull_exchange(
5202 &repo,
5203 "owner/repo",
5204 "main",
5205 PullOptions {
5206 local_thread: None,
5207 depth: None,
5208 target_state: None,
5209 materialization: PullMaterialization::Full,
5210 },
5211 ),
5212 )
5213 .await
5214 .expect("fresh bare pull must not hang")
5215 .expect("fresh bare pull succeeds");
5216 server.abort();
5217
5218 let advertised = captured.lock().unwrap().clone().expect("request captured");
5219 assert!(
5220 advertised.is_empty(),
5221 "a fresh repo must advertise no exclude_states"
5222 );
5223 assert!(exchange.result.success);
5224 assert_eq!(
5225 exchange.object_count,
5226 full_closure.len(),
5227 "a fresh pull must receive the full closure, nothing wrongly excluded"
5228 );
5229 assert!(
5230 wire::enumerate_state_closure(repo.store(), remote).is_ok(),
5231 "fresh pull must install the complete closure"
5232 );
5233 }
5234
5235 #[tokio::test]
5236 async fn explicit_local_thread_incomplete_closure_does_not_advertise_and_repairs() {
5237 let (_src_dir, src_repo) = temp_repo_unseeded();
5247 let parent = put_state_with_blob(&src_repo, "base", vec![]);
5248 let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
5249 let child_closure =
5250 wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
5251 let full_pack =
5252 wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
5253
5254 let (_dir, repo) = temp_repo_unseeded();
5259 let child_only = wire::enumerate_state_closure_with_options(
5260 src_repo.store(),
5261 child,
5262 wire::StateClosureOptions {
5263 depth: None,
5264 exclude_states: vec![parent],
5265 },
5266 )
5267 .expect("child-only objects");
5268 let child_only_pack =
5269 wire::build_native_pack(src_repo.store(), &child_only).expect("child-only pack");
5270 wire::install_received_pack(
5271 repo.store(),
5272 &child_only_pack.pack_data,
5273 &child_only_pack.index_data,
5274 )
5275 .expect("install child-only objects");
5276 repo.refs()
5277 .set_thread(&ThreadName::from("feature"), &child)
5278 .expect("set local thread to child");
5279 assert!(
5282 matches!(
5283 wire::enumerate_state_closure(repo.store(), child),
5284 Err(ProtocolError::ObjectNotFound(_))
5285 ),
5286 "the explicit thread head's closure must start incomplete"
5287 );
5288
5289 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5290 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5291 remote_state: child,
5292 full_closure: child_closure.clone(),
5293 delta_objects: Vec::new(),
5294 known_parent: parent,
5295 full_pack: full_pack.clone(),
5296 delta_pack: full_pack,
5297 captured_exclude: captured.clone(),
5298 })
5299 .await
5300 else {
5301 return;
5302 };
5303
5304 let exchange = tokio::time::timeout(
5305 Duration::from_secs(5),
5306 client.pull_exchange(
5307 &repo,
5308 "owner/repo",
5309 "main",
5310 PullOptions {
5311 local_thread: Some("feature"),
5312 depth: None,
5313 target_state: None,
5314 materialization: PullMaterialization::Full,
5315 },
5316 ),
5317 )
5318 .await
5319 .expect("explicit-thread pull must not hang")
5320 .expect("explicit-thread pull succeeds");
5321 server.abort();
5322
5323 let advertised = captured.lock().unwrap().clone().expect("request captured");
5324 assert!(
5325 advertised.is_empty(),
5326 "an explicit --local-thread with an incomplete closure must advertise nothing, \
5327 falling back to a full pull (got {advertised:?})"
5328 );
5329 assert!(exchange.result.success);
5330 let reassembled = wire::enumerate_state_closure(repo.store(), child)
5332 .expect("the client must hold the complete child closure after the fallback full pull");
5333 assert_eq!(
5334 reassembled.len(),
5335 child_closure.len(),
5336 "the full-pull fallback must leave the client with the complete closure, no gaps"
5337 );
5338 }
5339
5340 #[tokio::test]
5341 async fn explicit_local_thread_complete_closure_advertises_and_fires_short_circuit() {
5342 let (_dir, repo) = temp_repo();
5347 let head = put_state_with_blob(&repo, "tip", vec![]);
5348 repo.refs()
5349 .set_thread(&ThreadName::from("feature"), &head)
5350 .expect("set local thread");
5351 wire::enumerate_state_closure(repo.store(), head)
5353 .expect("client holds the complete thread closure");
5354
5355 let full_closure =
5356 wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
5357 let full_pack =
5358 wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
5359 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
5360
5361 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
5362 remote_state: head,
5363 full_closure,
5364 delta_objects: Vec::new(),
5365 known_parent: ChangeId::generate(),
5366 full_pack: full_pack.clone(),
5367 delta_pack: full_pack,
5368 captured_exclude: captured.clone(),
5369 })
5370 .await
5371 else {
5372 return;
5373 };
5374
5375 let exchange = tokio::time::timeout(
5376 Duration::from_secs(5),
5377 client.pull_exchange(
5378 &repo,
5379 "owner/repo",
5380 "main",
5381 PullOptions {
5382 local_thread: Some("feature"),
5383 depth: None,
5384 target_state: None,
5385 materialization: PullMaterialization::Full,
5386 },
5387 ),
5388 )
5389 .await
5390 .expect("explicit-thread pull must not hang")
5391 .expect("explicit-thread pull succeeds");
5392 server.abort();
5393
5394 let advertised = captured.lock().unwrap().clone().expect("request captured");
5395 assert_eq!(
5396 advertised,
5397 vec![head.to_string_full()],
5398 "an explicit --local-thread with a complete closure must advertise its head"
5399 );
5400 assert!(exchange.result.success);
5401 assert_eq!(
5402 exchange.object_count, 0,
5403 "advertising the complete head must fire the zero-delta short-circuit, not a full pull"
5404 );
5405 }
5406}