1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 io::{self, Write},
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8 time::{Duration, Instant},
9};
10
11use grpc::heddle::v1::{
12 GetBlobRequest, GitCheckpointTransfer, GitLaneTransfer, GitPackTransfer, GitRefKind,
13 GitRefUpdateTransfer, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
14 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
15 RedactionTransfer, StateVisibilityTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy,
16 ThreadMetadata, ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects,
17 git_lane_transfer, pull_message, push_message,
18};
19use objects::{
20 object::{ChangeId, ContentHash, MarkerName, ThreadName},
21 store::{AnyStore, ObjectStore, PackObjectId},
22};
23use repo::{
24 GitCheckpointRecord, Repository, RepositoryCapability, RevisionAddress, SyncedThreadMetadata,
25 ThreadManager,
26};
27use sley::{
28 ObjectId as GitObjectId, RefPrecondition, ReferenceTarget, Repository as SleyRepository,
29};
30use tokio::sync::mpsc;
31use tokio_stream::wrappers::ReceiverStream;
32use tonic::Request;
33use wire::{
34 ObjectInfo, ObjectType, PlannedObject, ProtocolError, PullComplete, PushComplete, RefEntry,
35 RefUpdated,
36};
37
38use super::{
39 HostedGrpcClient, PullMaterialization,
40 helpers::{
41 descriptor_id, descriptor_id_from_info, object_descriptor_with_status, object_type_name,
42 parse_descriptor_to_info, status_to_protocol_error, to_proto_object_info,
43 transport_mode_name,
44 },
45};
46
47#[derive(Clone, Copy)]
48struct PullOptions<'a> {
49 local_thread: Option<&'a str>,
50 depth: Option<u32>,
51 target_state: Option<ChangeId>,
52 materialization: PullMaterialization,
53}
54
55struct PullWantPlan {
56 wants: Vec<ObjectDescriptor>,
57 wanted_types: WantedTypes,
58 want_full_closure: bool,
59}
60
61type WantedTypes = HashMap<PackObjectId, Vec<ObjectType>>;
62
63struct GitLanePushPlan {
64 local_revision_address: String,
65 pack: GitPackPushPlan,
66 ref_update: PushMessage,
67}
68
69#[derive(Clone)]
70struct GitPackPushPlan {
71 transfer_id: String,
72 pack_id: Vec<u8>,
73 pack_size: u64,
74 root: GitObjectId,
75 git_repo: SleyRepository,
76}
77
78const PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD: usize = 512;
79const PULL_PACK_SPOOL_OBJECT_THRESHOLD: usize = 512;
80const NATIVE_PACK_DRAIN_OBJECT_INTERVAL: usize = 32;
81const NATIVE_PACK_OBJECT_PREFETCH_LIMIT: usize = 32;
82const NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT: usize = 8;
83
84#[derive(Debug, Clone, Default)]
85pub struct PullObjectMix {
86 pub blobs: usize,
87 pub trees: usize,
88 pub states: usize,
89 pub actions: usize,
90 pub redactions: usize,
91 pub state_visibilities: usize,
92}
93
94#[derive(Debug, Clone)]
95pub struct HostedRefEntry {
96 pub name: String,
97 pub change_id: ChangeId,
98 pub is_thread: bool,
99 pub revision_address: String,
100}
101
102impl PullObjectMix {
103 fn record(&mut self, obj_type: ObjectType) {
104 match obj_type {
105 ObjectType::Blob => self.blobs += 1,
106 ObjectType::Tree => self.trees += 1,
107 ObjectType::State => self.states += 1,
108 ObjectType::Action => self.actions += 1,
109 ObjectType::Redaction => self.redactions += 1,
110 ObjectType::StateVisibility => self.state_visibilities += 1,
111 }
112 }
113
114 pub fn total(&self) -> usize {
115 self.blobs
116 + self.trees
117 + self.states
118 + self.actions
119 + self.redactions
120 + self.state_visibilities
121 }
122}
123
124#[derive(Debug, Clone, Default)]
125pub struct PullProfile {
126 pub ready_wait: Duration,
127 pub receive_and_apply: Duration,
128 pub decode: Duration,
129 pub store_receive_object: Duration,
130 pub metadata_sync: Duration,
131 pub pack_decode_apply: Duration,
132 pub raw_decode_apply: Duration,
133 pub pack_decode: Duration,
134 pub raw_decode: Duration,
135 pub bytes_received: usize,
136 pub pack_bytes_received: usize,
137 pub raw_bytes_received: usize,
138 pub objects_received: usize,
139 pub object_mix: PullObjectMix,
140}
141
142impl HostedGrpcClient {
143 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
144 Ok(self
145 .list_refs_with_revision_addresses(repo_path)
146 .await?
147 .into_iter()
148 .map(|entry| RefEntry {
149 name: entry.name,
150 change_id: entry.change_id,
151 is_thread: entry.is_thread,
152 })
153 .collect())
154 }
155
156 pub async fn list_refs_with_revision_addresses(
157 &mut self,
158 repo_path: &str,
159 ) -> Result<Vec<HostedRefEntry>, ProtocolError> {
160 let mut request = Request::new(ListRefsRequest {
161 repo_path: repo_path.to_string(),
162 });
163 self.apply_auth(&mut request)?;
164 let response = self
165 .inner
166 .list_refs(request)
167 .await
168 .map_err(status_to_protocol_error)?
169 .into_inner();
170 response
171 .refs
172 .into_iter()
173 .map(|entry| {
174 Ok(HostedRefEntry {
175 name: entry.name,
176 change_id: ChangeId::try_from_slice(&entry.change_id)
177 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
178 is_thread: entry.is_thread,
179 revision_address: entry.revision_address,
180 })
181 })
182 .collect()
183 }
184
185 #[allow(clippy::too_many_arguments)]
186 pub async fn update_ref(
187 &mut self,
188 repo_path: &str,
189 name: &str,
190 is_thread: bool,
191 old_value: Option<ChangeId>,
192 new_value: ChangeId,
193 force: bool,
194 thread_metadata: Option<&SyncedThreadMetadata>,
195 ) -> Result<RefUpdated, ProtocolError> {
196 let mut request = Request::new(UpdateRefRequest {
197 repo_path: repo_path.to_string(),
198 name: name.to_string(),
199 is_thread,
200 force,
201 old_value: old_value
202 .map(|value| value.to_string_full())
203 .unwrap_or_default(),
204 new_value: new_value.to_string_full(),
205 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
206 old_revision_address: old_value.map(native_revision_address).unwrap_or_default(),
207 new_revision_address: native_revision_address(new_value),
208 client_operation_id: String::new(),
209 });
210 self.apply_auth(&mut request)?;
211 let response = self
212 .inner
213 .update_ref(request)
214 .await
215 .map_err(status_to_protocol_error)?
216 .into_inner();
217 Ok(RefUpdated {
218 success: response.success,
219 old_value: if response.old_value.is_empty() {
220 None
221 } else {
222 Some(
223 ChangeId::parse(&response.old_value)
224 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
225 )
226 },
227 error: (!response.error.is_empty()).then_some(response.error),
228 })
229 }
230
231 pub async fn push(
232 &mut self,
233 repo: &Repository,
234 repo_path: &str,
235 local_state: ChangeId,
236 target_thread: &str,
237 force: bool,
238 ) -> Result<PushComplete, ProtocolError> {
239 self.push_with_revision(
240 repo,
241 repo_path,
242 local_state,
243 target_thread,
244 force,
245 native_revision_address(local_state),
246 None,
247 )
248 .await
249 }
250
251 pub async fn push_git_overlay_checkpoint(
252 &mut self,
253 repo: &Repository,
254 repo_path: &str,
255 local_state: ChangeId,
256 target_thread: &str,
257 force: bool,
258 ) -> Result<PushComplete, ProtocolError> {
259 let git_lane = build_git_lane_push_plan(
260 repo,
261 local_state,
262 target_thread,
263 self.transport.chunk_size.max(1),
264 )?;
265 let local_revision_address = git_lane.local_revision_address.clone();
266 self.push_with_revision(
267 repo,
268 repo_path,
269 local_state,
270 target_thread,
271 force,
272 local_revision_address,
273 Some(git_lane),
274 )
275 .await
276 }
277
278 #[allow(clippy::too_many_arguments)]
279 async fn push_with_revision(
280 &mut self,
281 repo: &Repository,
282 repo_path: &str,
283 local_state: ChangeId,
284 target_thread: &str,
285 force: bool,
286 local_revision_address: String,
287 mut git_lane: Option<GitLanePushPlan>,
288 ) -> Result<PushComplete, ProtocolError> {
289 let _ = self.transport.chunk_size;
290 let _ = self.transport.resume_attempts;
291 let object_plan = wire::enumerate_state_closure_plan(repo.store(), local_state)?;
292 let full_objects = if object_plan.len() <= PUSH_FULL_DESCRIPTOR_OBJECT_THRESHOLD {
293 Some(wire::enumerate_state_closure(repo.store(), local_state)?)
294 } else {
295 None
296 };
297 let object_count = full_objects
298 .as_ref()
299 .map_or(object_plan.len(), std::vec::Vec::len);
300 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
301 let transport_mode = preferred_transport_mode(&self.transport, object_count);
302 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
303 let request_message = PushMessage {
304 body: Some(push_message::Body::Request(PushRequest {
305 repo_path: repo_path.to_string(),
306 local_state: local_state.to_string_full(),
307 target_thread: target_thread.to_string(),
308 create_thread: true,
309 force,
310 objects: full_objects.as_ref().map_or_else(
311 || object_plan.iter().map(to_proto_planned_object).collect(),
312 |objects| objects.iter().map(to_proto_object_info).collect(),
313 ),
314 transfer: Some(self.transport.transfer_checkpoint_with_mode(
315 transfer_id.clone(),
316 transport_mode,
317 0,
318 0,
319 false,
320 )),
321 partial_fetch_status: partial_fetch_status_for_repo(repo),
322 allow_partial_fetch: true,
323 thread_metadata: thread_metadata
324 .map(|metadata| to_proto_thread_metadata(&metadata)),
325 local_revision_address,
326 client_operation_id: String::new(),
327 })),
328 };
329
330 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
331 tx.send(request_message).await.map_err(|_| {
332 ProtocolError::InvalidState("failed to initialize push stream".to_string())
333 })?;
334 let mut request = Request::new(ReceiverStream::new(rx));
335 self.apply_auth(&mut request)?;
336 let mut response = self
337 .inner
338 .push(request)
339 .await
340 .map_err(status_to_protocol_error)?
341 .into_inner();
342
343 let ready = match response.message().await.map_err(status_to_protocol_error)? {
344 Some(PushMessage {
345 body: Some(push_message::Body::Ready(ready)),
346 }) => ready,
347 _ => {
348 return Err(ProtocolError::InvalidState(
349 "expected PushReady from gRPC server".to_string(),
350 ));
351 }
352 };
353 if let Some(git_lane) = git_lane.as_mut() {
354 apply_git_ref_expectation(&mut git_lane.ref_update, &ready.remote_revision_address)?;
355 }
356
357 let object_index = match full_objects {
358 Some(objects) => objects
359 .into_iter()
360 .map(|info| (descriptor_id_from_info(&info), info))
361 .collect::<HashMap<_, _>>(),
362 None => object_plan
363 .into_iter()
364 .map(|object| {
365 (
366 descriptor_id_from_plan(&object),
367 object_info_from_plan(&object),
368 )
369 })
370 .collect::<HashMap<_, _>>(),
371 };
372
373 let ready_transport_mode = ready
374 .transfer
375 .as_ref()
376 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
377 .unwrap_or(transport_mode);
378 let wanted_infos = ready
379 .want_objects
380 .into_iter()
381 .map(|want| {
382 object_index
383 .get(&descriptor_id(&want))
384 .cloned()
385 .ok_or_else(|| {
386 ProtocolError::InvalidState("server requested unknown object".to_string())
387 })
388 })
389 .collect::<Result<Vec<_>, _>>()?;
390
391 let (wanted_sidecars, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
396 .into_iter()
397 .partition(|info| is_out_of_pack_transfer_object_type(info.obj_type));
398
399 if !wanted_packable.is_empty() {
400 send_native_pack_streaming_messages(
401 &tx,
402 repo,
403 &wanted_packable,
404 &transfer_id,
405 self.transport.chunk_size.max(1),
406 &self.transport,
407 ready_transport_mode,
408 )
409 .await?;
410 }
411
412 for info in wanted_sidecars {
413 let message = sidecar_push_message(repo, info)?;
414 tx.send(message).await.map_err(|_| {
415 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
416 })?;
417 }
418
419 if let Some(git_lane) = git_lane {
420 send_git_pack_streaming_messages(&tx, &git_lane.pack, self.transport.chunk_size.max(1))
421 .await?;
422 tx.send(git_lane.ref_update).await.map_err(|_| {
423 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
424 })?;
425 }
426 drop(tx);
427
428 let result = match response.message().await.map_err(status_to_protocol_error)? {
429 Some(PushMessage {
430 body: Some(push_message::Body::Complete(complete)),
431 }) => PushComplete {
432 success: complete.success,
433 new_state: if complete.new_state.is_empty() {
434 None
435 } else {
436 Some(
437 ChangeId::parse(&complete.new_state)
438 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
439 )
440 },
441 error: (!complete.error.is_empty()).then_some(complete.error),
442 transfer_id: complete
443 .transfer
444 .as_ref()
445 .map(|transfer| transfer.transfer_id.clone())
446 .unwrap_or_default(),
447 transport_mode: complete
448 .transfer
449 .as_ref()
450 .map(|transfer| transport_mode_name(transfer.transport_mode))
451 .unwrap_or("raw-objects")
452 .to_string(),
453 resume_offset: complete
454 .transfer
455 .as_ref()
456 .map(|transfer| transfer.resume_offset)
457 .unwrap_or_default(),
458 chunk_index: complete
459 .transfer
460 .as_ref()
461 .map(|transfer| transfer.chunk_index)
462 .unwrap_or_default(),
463 checkpoint: complete
464 .transfer
465 .as_ref()
466 .map(|transfer| transfer.checkpoint.clone())
467 .unwrap_or_default(),
468 is_complete: complete
469 .transfer
470 .as_ref()
471 .map(|transfer| transfer.is_complete)
472 .unwrap_or(false),
473 },
474 _ => {
475 return Err(ProtocolError::InvalidState(
476 "expected PushComplete from gRPC server".to_string(),
477 ));
478 }
479 };
480
481 if result.success {
482 self.sync_remote_markers(repo, repo_path, local_state)
483 .await?;
484 }
485 Ok(result)
486 }
487
488 pub async fn pull(
489 &mut self,
490 repo: &Repository,
491 repo_path: &str,
492 remote_thread: &str,
493 local_thread: Option<&str>,
494 ) -> Result<PullComplete, ProtocolError> {
495 self.pull_with_options(
496 repo,
497 repo_path,
498 remote_thread,
499 PullOptions {
500 local_thread,
501 depth: None,
502 target_state: None,
503 materialization: PullMaterialization::Full,
504 },
505 )
506 .await
507 }
508
509 pub async fn pull_profiled(
510 &mut self,
511 repo: &Repository,
512 repo_path: &str,
513 remote_thread: &str,
514 local_thread: Option<&str>,
515 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
516 self.pull_exchange(
517 repo,
518 repo_path,
519 remote_thread,
520 PullOptions {
521 local_thread,
522 depth: None,
523 target_state: None,
524 materialization: PullMaterialization::Full,
525 },
526 )
527 .await
528 .map(|exchange| (exchange.result, exchange.profile))
529 }
530
531 pub async fn pull_partial(
532 &mut self,
533 repo: &Repository,
534 repo_path: &str,
535 remote_thread: &str,
536 local_thread: Option<&str>,
537 ) -> Result<PullComplete, ProtocolError> {
538 self.pull_with_options(
539 repo,
540 repo_path,
541 remote_thread,
542 PullOptions {
543 local_thread,
544 depth: None,
545 target_state: None,
546 materialization: PullMaterialization::Lazy,
547 },
548 )
549 .await
550 }
551
552 pub async fn pull_with_depth_and_materialization(
553 &mut self,
554 repo: &Repository,
555 repo_path: &str,
556 remote_thread: &str,
557 local_thread: Option<&str>,
558 depth: Option<u32>,
559 materialization: PullMaterialization,
560 ) -> Result<PullComplete, ProtocolError> {
561 self.pull_with_options(
562 repo,
563 repo_path,
564 remote_thread,
565 PullOptions {
566 local_thread,
567 depth,
568 target_state: None,
569 materialization,
570 },
571 )
572 .await
573 }
574
575 pub async fn pull_with_depth(
576 &mut self,
577 repo: &Repository,
578 repo_path: &str,
579 remote_thread: &str,
580 local_thread: Option<&str>,
581 depth: Option<u32>,
582 ) -> Result<PullComplete, ProtocolError> {
583 self.pull_with_depth_and_materialization(
584 repo,
585 repo_path,
586 remote_thread,
587 local_thread,
588 depth,
589 PullMaterialization::Full,
590 )
591 .await
592 }
593
594 pub async fn fetch_state(
595 &mut self,
596 repo: &Repository,
597 repo_path: &str,
598 remote_thread: &str,
599 target_state: ChangeId,
600 ) -> Result<usize, ProtocolError> {
601 self.pull_exchange(
602 repo,
603 repo_path,
604 remote_thread,
605 PullOptions {
606 local_thread: None,
607 depth: None,
608 target_state: Some(target_state),
609 materialization: PullMaterialization::Full,
610 },
611 )
612 .await
613 .map(|exchange| exchange.object_count)
614 }
615
616 pub async fn fetch_state_partial(
617 &mut self,
618 repo: &Repository,
619 repo_path: &str,
620 remote_thread: &str,
621 target_state: ChangeId,
622 ) -> Result<usize, ProtocolError> {
623 self.pull_exchange(
624 repo,
625 repo_path,
626 remote_thread,
627 PullOptions {
628 local_thread: None,
629 depth: None,
630 target_state: Some(target_state),
631 materialization: PullMaterialization::Lazy,
632 },
633 )
634 .await
635 .map(|exchange| exchange.object_count)
636 }
637
638 pub async fn hydrate_blob_at_path(
639 &mut self,
640 repo: &Repository,
641 repo_path: &str,
642 reference: &str,
643 path: &str,
644 ) -> Result<objects::object::Blob, ProtocolError> {
645 let mut request = Request::new(GetBlobRequest {
646 repo_path: repo_path.to_string(),
647 r#ref: reference.to_string(),
648 path: path.to_string(),
649 });
650 self.apply_auth(&mut request)?;
651 let response = self
652 .content
653 .get_blob(request)
654 .await
655 .map_err(status_to_protocol_error)?
656 .into_inner();
657
658 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
659 let blob = objects::object::Blob::new(content);
660 repo.store().put_blob(&blob)?;
661 repo.clear_missing_blob(&blob.hash())?;
662 Ok(blob)
663 }
664
665 pub async fn hydrate_missing_blobs_for_state(
666 &mut self,
667 repo: &Repository,
668 repo_path: &str,
669 remote_thread: &str,
670 target_state: ChangeId,
671 ) -> Result<usize, ProtocolError> {
672 let exchange = self
673 .pull_exchange(
674 repo,
675 repo_path,
676 remote_thread,
677 PullOptions {
678 local_thread: None,
679 depth: None,
680 target_state: Some(target_state),
681 materialization: PullMaterialization::Full,
682 },
683 )
684 .await?;
685 clear_missing_blobs_for_state(repo, target_state)?;
686 Ok(exchange.object_count)
687 }
688
689 async fn pull_with_options(
690 &mut self,
691 repo: &Repository,
692 repo_path: &str,
693 remote_thread: &str,
694 options: PullOptions<'_>,
695 ) -> Result<PullComplete, ProtocolError> {
696 self.pull_exchange(repo, repo_path, remote_thread, options)
697 .await
698 .map(|exchange| exchange.result)
699 }
700
701 async fn pull_exchange(
702 &mut self,
703 repo: &Repository,
704 repo_path: &str,
705 remote_thread: &str,
706 options: PullOptions<'_>,
707 ) -> Result<PullExchange, ProtocolError> {
708 let exchange_start = Instant::now();
709 let mut exclude_states = Vec::new();
710 let advertised_head = if let Some(local_thread) = options.local_thread {
721 locally_complete_local_thread_head(repo, local_thread, options.target_state)?
722 } else {
723 locally_complete_pull_head(repo, remote_thread, options.target_state)?
724 };
725 if let Some(head) = advertised_head {
726 exclude_states.push(head);
727 }
728 let allow_partial_fetch = options.materialization.allows_partial_fetch();
729 let fresh_full_pull =
730 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
731
732 let transfer_id = pull_transfer_id(
733 repo_path,
734 remote_thread,
735 options.local_thread,
736 options.depth,
737 options.target_state,
738 );
739 let request_message = PullMessage {
740 body: Some(pull_message::Body::Request(PullRequest {
741 repo_path: repo_path.to_string(),
742 remote_thread: remote_thread.to_string(),
743 local_thread: options.local_thread.unwrap_or_default().to_string(),
744 target_state: options
745 .target_state
746 .map(|value| value.to_string_full())
747 .unwrap_or_default(),
748 depth: options.depth.unwrap_or_default(),
749 exclude_states: exclude_states
750 .iter()
751 .map(ChangeId::to_string_full)
752 .collect(),
753 transfer: Some(self.transport.transfer_checkpoint_with_mode(
754 transfer_id.clone(),
755 TransportMode::NativePack,
756 0,
757 0,
758 false,
759 )),
760 partial_fetch_status: partial_fetch_status_for_repo(repo),
761 allow_partial_fetch,
762 fresh_full_pull,
763 target_revision_address: options
764 .target_state
765 .map(native_revision_address)
766 .unwrap_or_default(),
767 client_operation_id: String::new(),
768 })),
769 };
770
771 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
772 tx.send(request_message).await.map_err(|_| {
773 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
774 })?;
775 let mut request = Request::new(ReceiverStream::new(rx));
776 self.apply_auth(&mut request)?;
777 let mut response = self
778 .inner
779 .pull(request)
780 .await
781 .map_err(status_to_protocol_error)?
782 .into_inner();
783
784 let ready = match response.message().await.map_err(status_to_protocol_error)? {
785 Some(PullMessage {
786 body: Some(pull_message::Body::Ready(ready)),
787 }) => ready,
788 _ => {
789 return Err(ProtocolError::InvalidState(
790 "expected PullReady from gRPC server".to_string(),
791 ));
792 }
793 };
794 let mut profile = PullProfile {
795 ready_wait: exchange_start.elapsed(),
796 ..PullProfile::default()
797 };
798 let remote_state = ChangeId::parse(&ready.remote_state)
799 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
800 let advertised_object_count = ready.objects_to_fetch.len();
801 let PullWantPlan {
802 wants,
803 wanted_types,
804 want_full_closure,
805 } = plan_pull_wants(
806 repo,
807 &remote_state,
808 ready.full_closure_available,
809 ready.objects_to_fetch,
810 allow_partial_fetch,
811 )?;
812 let native_pack_required = native_pack_required_for_pull(want_full_closure, &wanted_types);
813
814 tx.send(PullMessage {
815 body: Some(pull_message::Body::Want(WantObjects {
816 objects: wants.clone(),
817 want_full_closure,
818 transfer: Some(self.transport.transfer_checkpoint_with_mode(
819 transfer_id.clone(),
820 TransportMode::NativePack,
821 0,
822 0,
823 false,
824 )),
825 })),
826 })
827 .await
828 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
829 drop(tx);
830
831 let receive_start = Instant::now();
832 let use_pack_spool = advertised_object_count > PULL_PACK_SPOOL_OBJECT_THRESHOLD;
833 let mut pack_state = wire::PackChunkState::default();
834 let mut pack_spool = if use_pack_spool {
835 Some(wire::PackChunkSpool::new_in(repo.heddle_dir())?)
836 } else {
837 None
838 };
839 let mut git_lane_repo = None;
840 let mut git_pack_state = GitPackPullInstallState::default();
841 let mut received = 0usize;
842 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
843 match message.body {
844 Some(pull_message::Body::Pack(chunk)) => {
845 profile.bytes_received =
846 profile.bytes_received.saturating_add(chunk.data.len());
847 profile.pack_bytes_received =
848 profile.pack_bytes_received.saturating_add(chunk.data.len());
849 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
850 ProtocolError::InvalidState(
851 "native pack chunk missing transfer checkpoint".to_string(),
852 )
853 })?;
854 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
855 .unwrap_or(PackStreamKind::Unspecified);
856 if stream_kind == PackStreamKind::Unspecified {
857 return Err(ProtocolError::InvalidState(
858 "native pack chunk missing stream kind".to_string(),
859 ));
860 }
861 let decode_start = Instant::now();
862 if let Some(pack_spool) = pack_spool.as_mut() {
863 pack_spool.receive_chunk(
864 stream_kind == PackStreamKind::Index,
865 transfer.resume_offset,
866 transfer.chunk_index,
867 transfer.is_complete,
868 &chunk.data,
869 chunk.is_final_chunk,
870 )?;
871 } else {
872 wire::receive_pack_chunk(
873 &mut pack_state,
874 stream_kind == PackStreamKind::Index,
875 transfer.resume_offset,
876 transfer.chunk_index,
877 transfer.is_complete,
878 &chunk.data,
879 chunk.is_final_chunk,
880 )?;
881 }
882 let decode_elapsed = decode_start.elapsed();
883 profile.pack_decode += decode_elapsed;
884 profile.pack_decode_apply += decode_elapsed;
885 profile.decode += decode_elapsed;
886 }
887 Some(pull_message::Body::Redaction(transfer)) => {
888 wire::check_received_transfer_blob_size(
894 transfer.redactions_blob.len(),
895 wire::MAX_RECEIVED_REDACTIONS_BLOB_SIZE,
896 "redactions",
897 )?;
898 profile.bytes_received = profile
899 .bytes_received
900 .saturating_add(transfer.redactions_blob.len());
901 profile.object_mix.record(ObjectType::Redaction);
902 let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
903 ProtocolError::InvalidState(format!(
904 "RedactionTransfer.blob_hash is not a valid content hash: {err}"
905 ))
906 })?;
907 let decode_start = Instant::now();
908 repo.accept_wire_redactions(blob, &transfer.redactions_blob)
909 .map_err(|err| {
910 ProtocolError::InvalidState(format!(
911 "accept_wire_redactions for blob {}: {err}",
912 transfer.blob_hash
913 ))
914 })?;
915 let decode_elapsed = decode_start.elapsed();
916 profile.store_receive_object += decode_elapsed;
917 }
918 Some(pull_message::Body::StateVisibility(transfer)) => {
919 wire::check_received_transfer_blob_size(
920 transfer.state_visibility_blob.len(),
921 wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
922 "state-visibility",
923 )?;
924 profile.bytes_received = profile
925 .bytes_received
926 .saturating_add(transfer.state_visibility_blob.len());
927 profile.object_mix.record(ObjectType::StateVisibility);
928 let state = ChangeId::parse(&transfer.state_id).map_err(|err| {
929 ProtocolError::InvalidState(format!(
930 "StateVisibilityTransfer.state_id is not a valid ChangeId: {err}"
931 ))
932 })?;
933 let decode_start = Instant::now();
934 repo.accept_wire_state_visibility(state, &transfer.state_visibility_blob)
935 .map_err(|err| {
936 ProtocolError::InvalidState(format!(
937 "accept_wire_state_visibility for state {}: {err}",
938 transfer.state_id
939 ))
940 })?;
941 let decode_elapsed = decode_start.elapsed();
942 profile.store_receive_object += decode_elapsed;
943 }
944 Some(pull_message::Body::GitLane(transfer)) => {
945 let decode_start = Instant::now();
946 profile.bytes_received = profile
947 .bytes_received
948 .saturating_add(git_lane_transfer_size(&transfer));
949 accept_git_lane_pull_transfer(
950 repo,
951 &mut git_lane_repo,
952 &mut git_pack_state,
953 transfer,
954 )?;
955 let decode_elapsed = decode_start.elapsed();
956 profile.store_receive_object += decode_elapsed;
957 }
958 Some(pull_message::Body::Complete(complete)) => {
959 profile.receive_and_apply = receive_start.elapsed();
960 git_pack_state.ensure_idle()?;
961 let final_state = if complete.new_state.is_empty() {
962 None
963 } else {
964 Some(
965 ChangeId::parse(&complete.new_state)
966 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
967 )
968 };
969
970 if complete.success {
971 if native_pack_required {
972 let store_start = Instant::now();
973 let installed_ids = if let Some(pack_spool) = pack_spool.as_mut() {
974 if !pack_spool.is_complete() {
975 return Err(ProtocolError::InvalidState(
976 "pull completed before native pack stream finished"
977 .to_string(),
978 ));
979 }
980 pack_spool.install_into(repo.store())?
981 } else {
982 if !pack_state.is_complete() {
983 return Err(ProtocolError::InvalidState(
984 "pull completed before native pack stream finished"
985 .to_string(),
986 ));
987 }
988 wire::install_received_pack(
989 repo.store(),
990 &pack_state.pack_data,
991 &pack_state.index_data,
992 )?
993 };
994 profile.store_receive_object += store_start.elapsed();
995 received = installed_ids.len();
996 for id in installed_ids {
997 match (id, wanted_packable_type(&wanted_types, &id)) {
998 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
999 profile.object_mix.record(ObjectType::Blob);
1000 repo.clear_missing_blob(&hash)?;
1001 }
1002 (_, Some(obj_type)) => {
1003 profile.object_mix.record(obj_type);
1004 }
1005 (PackObjectId::ChangeId(_), None) => {
1006 profile.object_mix.record(ObjectType::State);
1007 }
1008 (PackObjectId::Hash(hash), None) => {
1009 let inferred =
1010 infer_installed_hash_object_type(repo, &hash)?;
1011 profile.object_mix.record(inferred);
1012 }
1013 }
1014 }
1015 }
1016
1017 let metadata_start = Instant::now();
1018 if let Some(local_thread) = options.local_thread
1019 && let Some(state) = final_state
1020 {
1021 repo.refs()
1022 .set_thread(&ThreadName::from(local_thread), &state)?;
1023 }
1024 if let Some(state) = final_state
1025 && allow_partial_fetch
1026 {
1027 mark_missing_blobs_for_state(repo, state)?;
1028 } else if final_state.is_some() {
1029 let _ = repo.clear_all_missing_blobs()?;
1030 }
1031 let synced_markers = complete
1032 .transfer
1033 .as_ref()
1034 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
1035 .transpose()?
1036 .unwrap_or(false);
1037 if !synced_markers {
1038 self.sync_local_markers(repo, repo_path).await?;
1039 }
1040 profile.metadata_sync = metadata_start.elapsed();
1041 profile.objects_received = received;
1042 return Ok(PullExchange {
1043 result: PullComplete {
1044 success: true,
1045 final_state,
1046 error: None,
1047 transfer_id: complete
1048 .transfer
1049 .as_ref()
1050 .map(|transfer| transfer.transfer_id.clone())
1051 .unwrap_or_default(),
1052 transport_mode: complete
1053 .transfer
1054 .as_ref()
1055 .map(|transfer| transport_mode_name(transfer.transport_mode))
1056 .unwrap_or("native-pack")
1057 .to_string(),
1058 resume_offset: complete
1059 .transfer
1060 .as_ref()
1061 .map(|transfer| transfer.resume_offset)
1062 .unwrap_or_default(),
1063 chunk_index: complete
1064 .transfer
1065 .as_ref()
1066 .map(|transfer| transfer.chunk_index)
1067 .unwrap_or_default(),
1068 checkpoint: complete
1069 .transfer
1070 .as_ref()
1071 .map(|transfer| transfer.checkpoint.clone())
1072 .unwrap_or_default(),
1073 is_complete: complete
1074 .transfer
1075 .as_ref()
1076 .map(|transfer| transfer.is_complete)
1077 .unwrap_or(false),
1078 },
1079 object_count: received,
1080 profile,
1081 });
1082 }
1083
1084 profile.objects_received = received;
1085 return Ok(PullExchange {
1086 result: PullComplete {
1087 success: false,
1088 final_state,
1089 error: (!complete.error.is_empty()).then_some(complete.error),
1090 transfer_id: complete
1091 .transfer
1092 .as_ref()
1093 .map(|transfer| transfer.transfer_id.clone())
1094 .unwrap_or_default(),
1095 transport_mode: complete
1096 .transfer
1097 .as_ref()
1098 .map(|transfer| transport_mode_name(transfer.transport_mode))
1099 .unwrap_or("native-pack")
1100 .to_string(),
1101 resume_offset: complete
1102 .transfer
1103 .as_ref()
1104 .map(|transfer| transfer.resume_offset)
1105 .unwrap_or_default(),
1106 chunk_index: complete
1107 .transfer
1108 .as_ref()
1109 .map(|transfer| transfer.chunk_index)
1110 .unwrap_or_default(),
1111 checkpoint: complete
1112 .transfer
1113 .as_ref()
1114 .map(|transfer| transfer.checkpoint.clone())
1115 .unwrap_or_default(),
1116 is_complete: complete
1117 .transfer
1118 .as_ref()
1119 .map(|transfer| transfer.is_complete)
1120 .unwrap_or(false),
1121 },
1122 object_count: received,
1123 profile,
1124 });
1125 }
1126 _ => {}
1127 }
1128 }
1129
1130 Err(ProtocolError::InvalidState(format!(
1131 "pull stream ended unexpectedly after receiving {received} packed objects"
1132 )))
1133 }
1134}
1135
1136fn redaction_push_message(
1137 repo: &Repository,
1138 info: wire::ObjectInfo,
1139) -> Result<PushMessage, ProtocolError> {
1140 let wire::ObjectId::Hash(blob) = info.id else {
1141 return Err(ProtocolError::InvalidState(
1142 "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
1143 ));
1144 };
1145 let hex = blob.to_hex();
1146 let bytes = repo
1151 .store()
1152 .get_redactions_bytes_for_blob(&blob)
1153 .map_err(|err| {
1154 ProtocolError::InvalidState(format!("load redactions sidecar for {}: {err}", hex))
1155 })?
1156 .ok_or_else(|| {
1157 ProtocolError::InvalidState(format!(
1158 "server wants redaction for blob {} but sender has no sidecar",
1159 hex
1160 ))
1161 })?;
1162 Ok(PushMessage {
1163 body: Some(push_message::Body::Redaction(RedactionTransfer {
1164 blob_hash: hex,
1165 redactions_blob: bytes.into(),
1166 })),
1167 })
1168}
1169
1170fn is_out_of_pack_transfer_object_type(obj_type: ObjectType) -> bool {
1171 matches!(
1172 obj_type,
1173 ObjectType::Redaction | ObjectType::StateVisibility
1174 )
1175}
1176
1177fn native_pack_required_for_pull(want_full_closure: bool, wanted_types: &WantedTypes) -> bool {
1178 want_full_closure
1179 || wanted_types
1180 .values()
1181 .flatten()
1182 .copied()
1183 .any(wire::is_native_packable_object_type)
1184}
1185
1186fn object_info_from_plan(object: &PlannedObject) -> ObjectInfo {
1187 ObjectInfo {
1188 id: object.id.clone(),
1189 obj_type: object.obj_type,
1190 size: 0,
1191 delta_base: None,
1192 }
1193}
1194
1195fn to_proto_planned_object(object: &PlannedObject) -> ObjectDescriptor {
1196 object_descriptor_with_status(
1197 &object_info_from_plan(object),
1198 ObjectAvailabilityStatus::Present,
1199 "",
1200 )
1201}
1202
1203fn descriptor_id_from_plan(object: &PlannedObject) -> (String, String) {
1204 let id = match &object.id {
1205 wire::ObjectId::Hash(hash) => hash.to_hex(),
1206 wire::ObjectId::ChangeId(change_id) => change_id.to_string_full(),
1207 };
1208 (id, object_type_name(object.obj_type).to_string())
1209}
1210
1211fn record_wanted_type(wanted_types: &mut WantedTypes, pack_id: PackObjectId, obj_type: ObjectType) {
1212 let types = wanted_types.entry(pack_id).or_default();
1213 if !types.contains(&obj_type) {
1214 types.push(obj_type);
1215 }
1216}
1217
1218fn wanted_packable_type(wanted_types: &WantedTypes, pack_id: &PackObjectId) -> Option<ObjectType> {
1219 wanted_types.get(pack_id).and_then(|types| {
1220 types
1221 .iter()
1222 .copied()
1223 .find(|obj_type| wire::is_native_packable_object_type(*obj_type))
1224 })
1225}
1226
1227fn sidecar_push_message(
1228 repo: &Repository,
1229 info: wire::ObjectInfo,
1230) -> Result<PushMessage, ProtocolError> {
1231 match info.obj_type {
1232 ObjectType::Redaction => redaction_push_message(repo, info),
1233 ObjectType::StateVisibility => state_visibility_push_message(repo, info),
1234 obj_type => Err(ProtocolError::InvalidState(format!(
1235 "{obj_type:?} is not an out-of-pack sidecar object"
1236 ))),
1237 }
1238}
1239
1240fn state_visibility_push_message(
1241 repo: &Repository,
1242 info: wire::ObjectInfo,
1243) -> Result<PushMessage, ProtocolError> {
1244 let wire::ObjectId::ChangeId(state) = info.id else {
1245 return Err(ProtocolError::InvalidState(
1246 "wanted StateVisibility must be keyed by ObjectId::ChangeId(state)".to_string(),
1247 ));
1248 };
1249 let state_id = state.to_string_full();
1250 let bytes = repo
1251 .get_state_visibility_bytes_for_state(&state)
1252 .map_err(|err| {
1253 ProtocolError::InvalidState(format!(
1254 "load state-visibility sidecar for {}: {err}",
1255 state_id
1256 ))
1257 })?
1258 .ok_or_else(|| {
1259 ProtocolError::InvalidState(format!(
1260 "server wants state visibility for state {} but sender has no sidecar",
1261 state_id
1262 ))
1263 })?;
1264 Ok(PushMessage {
1265 body: Some(push_message::Body::StateVisibility(
1266 StateVisibilityTransfer {
1267 state_id,
1268 state_visibility_blob: bytes.into(),
1269 },
1270 )),
1271 })
1272}
1273
1274fn load_thread_metadata(
1275 repo: &Repository,
1276 target_thread: &str,
1277 local_state: ChangeId,
1278) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
1279 let thread_manager = ThreadManager::new(repo.heddle_dir());
1280 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
1281}
1282
1283fn plan_pull_wants(
1284 repo: &Repository,
1285 remote_state: &ChangeId,
1286 full_closure_available: bool,
1287 objects_to_fetch: Vec<ObjectDescriptor>,
1288 allow_partial_fetch: bool,
1289) -> Result<PullWantPlan, ProtocolError> {
1290 if full_closure_available {
1291 return Ok(PullWantPlan {
1292 wants: Vec::new(),
1293 wanted_types: HashMap::new(),
1294 want_full_closure: true,
1295 });
1296 }
1297 let request_full_closure =
1298 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
1299 let mut wants = Vec::with_capacity(objects_to_fetch.len());
1300 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
1301
1302 for descriptor in objects_to_fetch {
1303 let info = parse_descriptor_to_info(descriptor)?;
1304 let pack_id = match &info.id {
1305 wire::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
1306 wire::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
1307 };
1308 let include = if request_full_closure {
1309 true
1310 } else {
1311 let has = wire::has_object(repo.store(), &info)?;
1312 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
1313 };
1314
1315 if include {
1316 record_wanted_type(&mut wanted_types, pack_id, info.obj_type);
1317 wants.push(object_descriptor_with_status(
1318 &info,
1319 ObjectAvailabilityStatus::Missing,
1320 "requested by client",
1321 ));
1322 }
1323 }
1324
1325 Ok(PullWantPlan {
1326 wants,
1327 wanted_types,
1328 want_full_closure: false,
1329 })
1330}
1331
1332fn supports_compact_full_pull(
1333 repo: &Repository,
1334 allow_partial_fetch: bool,
1335 exclude_states: &[ChangeId],
1336) -> Result<bool, ProtocolError> {
1337 if allow_partial_fetch || !exclude_states.is_empty() {
1338 return Ok(false);
1339 }
1340 repo_looks_fresh(repo)
1341}
1342
1343fn locally_complete_pull_head(
1366 repo: &Repository,
1367 remote_thread: &str,
1368 target_state: Option<ChangeId>,
1369) -> Result<Option<ChangeId>, ProtocolError> {
1370 locally_complete_thread_head(repo, remote_thread, target_state)
1371}
1372
1373fn locally_complete_local_thread_head(
1382 repo: &Repository,
1383 local_thread: &str,
1384 target_state: Option<ChangeId>,
1385) -> Result<Option<ChangeId>, ProtocolError> {
1386 locally_complete_thread_head(repo, local_thread, target_state)
1387}
1388
1389fn locally_complete_thread_head(
1409 repo: &Repository,
1410 thread: &str,
1411 target_state: Option<ChangeId>,
1412) -> Result<Option<ChangeId>, ProtocolError> {
1413 if target_state.is_some() {
1416 return Ok(None);
1417 }
1418 if !repo.missing_blobs()?.is_empty() {
1421 return Ok(None);
1422 }
1423 let Some(head) = repo.refs().get_thread(&ThreadName::from(thread))? else {
1424 return Ok(None);
1427 };
1428 match wire::enumerate_state_closure(repo.store(), head) {
1433 Ok(_) => Ok(Some(head)),
1434 Err(ProtocolError::ObjectNotFound(_)) => Ok(None),
1435 Err(err) => Err(err),
1436 }
1437}
1438
1439fn should_request_full_closure(
1440 repo: &Repository,
1441 remote_state: &ChangeId,
1442 allow_partial_fetch: bool,
1443) -> Result<bool, ProtocolError> {
1444 if allow_partial_fetch || repo.store().has_state(remote_state)? {
1445 return Ok(false);
1446 }
1447 repo_looks_fresh(repo)
1448}
1449
1450fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
1451 if repo.head()?.is_some() {
1452 return Ok(false);
1453 }
1454 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1455 return Ok(false);
1456 }
1457 Ok(repo.missing_blobs()?.is_empty())
1458}
1459
1460fn infer_installed_hash_object_type(
1461 repo: &Repository,
1462 hash: &ContentHash,
1463) -> Result<ObjectType, ProtocolError> {
1464 let store = repo.store();
1465 if store.get_tree(hash)?.is_some() {
1466 return Ok(ObjectType::Tree);
1467 }
1468 if store
1469 .get_action(&objects::object::ActionId::from_hash(*hash))?
1470 .is_some()
1471 {
1472 return Ok(ObjectType::Action);
1473 }
1474 Ok(ObjectType::Blob)
1475}
1476
1477fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1478 const HEADER: &str = "heddle-markers-v1\n";
1479 if checkpoint.is_empty() {
1480 return Ok(false);
1481 }
1482 let payload = std::str::from_utf8(checkpoint)
1483 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1484 let Some(lines) = payload.strip_prefix(HEADER) else {
1485 return Ok(false);
1486 };
1487
1488 for line in lines.lines() {
1489 if line.is_empty() {
1490 continue;
1491 }
1492 let Some((name, change_id)) = line.split_once('\t') else {
1493 return Err(ProtocolError::InvalidState(
1494 "invalid marker snapshot line".to_string(),
1495 ));
1496 };
1497 let change_id = ChangeId::parse(change_id)
1498 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1499 if !repo.store().has_state(&change_id)? {
1500 continue;
1501 }
1502 let name = MarkerName::from(name);
1503 match repo.refs().get_marker(&name)? {
1504 Some(existing) if existing == change_id => {}
1505 Some(existing) => repo.refs().set_marker_cas(
1506 &name,
1507 refs::RefExpectation::Value(existing),
1508 &change_id,
1509 )?,
1510 None => repo.refs().create_marker(&name, &change_id)?,
1511 }
1512 }
1513
1514 Ok(true)
1515}
1516
1517fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1518 if s.is_empty() {
1519 return Vec::new();
1520 }
1521 objects::object::ChangeId::parse(s)
1522 .map(|id| id.as_bytes().to_vec())
1523 .unwrap_or_default()
1524}
1525
1526fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1527 ThreadMetadata {
1528 name: metadata.thread.clone(),
1529 target_thread: metadata.target_thread.clone(),
1530 parent_thread: metadata.parent_thread.clone(),
1531 task: metadata.task.clone(),
1532 thread_mode: metadata.mode.to_string(),
1533 thread_state: metadata.state.to_string(),
1534 freshness: metadata.freshness.to_string(),
1535 base_state: change_id_string_to_bytes(&metadata.base_state),
1536 base_root: change_id_string_to_bytes(&metadata.base_root),
1537 current_state: metadata
1538 .current_state
1539 .as_deref()
1540 .map(change_id_string_to_bytes),
1541 merged_state: metadata
1542 .merged_state
1543 .as_deref()
1544 .map(change_id_string_to_bytes),
1545 changed_paths: metadata.changed_paths.clone(),
1546 impact_categories: metadata
1547 .impact_categories
1548 .iter()
1549 .map(ToString::to_string)
1550 .collect(),
1551 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1552 promotion_suggested: metadata.promotion_suggested,
1553 verification_summary: Some(ThreadVerificationSummary {
1554 tests_passed: metadata.verification_summary.tests_passed,
1555 tests_failed: metadata
1556 .verification_summary
1557 .tests_failed
1558 .unwrap_or_default(),
1559 coverage_pct: metadata.verification_summary.coverage_pct,
1560 lint_warnings: metadata.verification_summary.lint_warnings,
1561 }),
1562 confidence_summary: Some(ThreadConfidenceSummary {
1563 value: metadata.confidence_summary.value,
1564 band: metadata
1565 .confidence_summary
1566 .band
1567 .as_ref()
1568 .map(ToString::to_string),
1569 }),
1570 integration_policy_result: Some(ThreadIntegrationPolicy {
1571 status: metadata
1572 .integration_policy_result
1573 .status
1574 .clone()
1575 .unwrap_or_default(),
1576 reason: metadata
1577 .integration_policy_result
1578 .reason
1579 .clone()
1580 .unwrap_or_default(),
1581 }),
1582 created_at: Some(prost_types::Timestamp {
1583 seconds: metadata.created_at.timestamp(),
1584 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1585 }),
1586 updated_at: Some(prost_types::Timestamp {
1587 seconds: metadata.updated_at.timestamp(),
1588 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1589 }),
1590 }
1591}
1592
1593struct PullExchange {
1594 result: PullComplete,
1595 object_count: usize,
1596 profile: PullProfile,
1597}
1598
1599fn mark_missing_blobs_for_state(
1600 repo: &Repository,
1601 state_id: ChangeId,
1602) -> Result<(), ProtocolError> {
1603 let state = repo
1604 .store()
1605 .get_state(&state_id)?
1606 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1607 let mut missing = collect_missing_blobs(repo, &state.tree)?;
1608 if let Some(context_root) = state.context.as_ref() {
1609 missing.extend(collect_missing_blobs(repo, context_root)?);
1610 }
1611 if let Some(discussions_blob) = state.discussions.as_ref()
1612 && !repo.store().has_blob(discussions_blob)?
1613 {
1614 missing.push(*discussions_blob);
1615 }
1616 missing
1617 .into_iter()
1618 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1619}
1620
1621fn clear_missing_blobs_for_state(
1622 repo: &Repository,
1623 state_id: ChangeId,
1624) -> Result<(), ProtocolError> {
1625 let state = repo
1626 .store()
1627 .get_state(&state_id)?
1628 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1629 let mut missing = collect_missing_blobs(repo, &state.tree)?;
1630 if let Some(context_root) = state.context.as_ref() {
1631 missing.extend(collect_missing_blobs(repo, context_root)?);
1632 }
1633 if let Some(discussions_blob) = state.discussions.as_ref() {
1634 missing.push(*discussions_blob);
1635 }
1636 missing
1637 .into_iter()
1638 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1639}
1640
1641fn collect_missing_blobs(
1642 repo: &Repository,
1643 tree_hash: &ContentHash,
1644) -> Result<Vec<ContentHash>, ProtocolError> {
1645 let mut missing = Vec::new();
1646 collect_missing_blobs_recursive(repo, tree_hash, &mut missing)?;
1647 Ok(missing)
1648}
1649
1650fn collect_missing_blobs_recursive(
1651 repo: &Repository,
1652 tree_hash: &ContentHash,
1653 missing: &mut Vec<ContentHash>,
1654) -> Result<(), ProtocolError> {
1655 let Some(tree) = repo.store().get_tree(tree_hash).map_err(|err| {
1656 ProtocolError::InvalidState(format!(
1657 "load tree {} while collecting lazy hydration missing blobs: {err}",
1658 tree_hash.to_hex()
1659 ))
1660 })?
1661 else {
1662 return Ok(());
1663 };
1664 for entry in tree.entries() {
1665 match entry.entry_type {
1666 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1667 if !repo.store().has_blob(&entry.hash).map_err(|err| {
1668 ProtocolError::InvalidState(format!(
1669 "check blob {} while collecting lazy hydration missing blobs: {err}",
1670 entry.hash.to_hex()
1671 ))
1672 })? {
1673 missing.push(entry.hash);
1674 }
1675 }
1676 objects::object::EntryType::Tree => {
1677 collect_missing_blobs_recursive(repo, &entry.hash, missing)?;
1678 }
1679 }
1680 }
1681 Ok(())
1682}
1683
1684fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1685 match repo.missing_blobs() {
1686 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1687 Ok(_) => PartialFetchStatus::Disabled as i32,
1688 Err(_) => PartialFetchStatus::Unspecified as i32,
1689 }
1690}
1691
1692fn pull_transfer_id(
1693 repo_path: &str,
1694 remote_thread: &str,
1695 local_thread: Option<&str>,
1696 depth: Option<u32>,
1697 target_state: Option<ChangeId>,
1698) -> String {
1699 format!(
1700 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1701 local_thread.unwrap_or_default(),
1702 target_state
1703 .map(|value| value.to_string_full())
1704 .unwrap_or_default()
1705 )
1706}
1707
1708fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1709 format!(
1710 "push:{repo_path}:{}:{target_thread}",
1711 local_state.to_string_full()
1712 )
1713}
1714
1715fn native_revision_address(change_id: ChangeId) -> String {
1716 RevisionAddress::heddle(change_id).to_string()
1717}
1718
1719fn git_revision_address(commit_oid: &GitObjectId) -> String {
1720 RevisionAddress::git_commit(commit_oid.to_hex()).to_string()
1721}
1722
1723fn build_git_lane_push_plan(
1724 repo: &Repository,
1725 local_state: ChangeId,
1726 target_thread: &str,
1727 chunk_size: usize,
1728) -> Result<GitLanePushPlan, ProtocolError> {
1729 if repo.capability() != RepositoryCapability::GitOverlay {
1730 return Err(ProtocolError::InvalidState(
1731 "Git lane pushes require a git-overlay repository".to_string(),
1732 ));
1733 }
1734 let checkpoint = repo
1735 .latest_git_checkpoint_for_change(&local_state)
1736 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1737 .ok_or_else(|| {
1738 ProtocolError::InvalidState(format!(
1739 "state {} has no Git checkpoint; run `heddle checkpoint` before pushing this git-overlay spool",
1740 local_state.short()
1741 ))
1742 })?;
1743 let git_repo = repo
1744 .git_overlay_sley_repository()
1745 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
1746 .ok_or_else(|| {
1747 ProtocolError::InvalidState("git-overlay repository has no Git store".to_string())
1748 })?;
1749 let commit_oid = GitObjectId::from_hex(git_repo.object_format(), &checkpoint.git_commit)
1750 .map_err(|err| {
1751 ProtocolError::InvalidState(format!(
1752 "checkpoint {} has invalid Git commit oid: {err}",
1753 checkpoint.git_commit
1754 ))
1755 })?;
1756 git_repo.read_commit(&commit_oid).map_err(|err| {
1757 ProtocolError::InvalidState(format!(
1758 "checkpoint {} is not a readable Git commit: {err}",
1759 checkpoint.git_commit
1760 ))
1761 })?;
1762
1763 let pack = build_git_lane_pack_plan(&git_repo, commit_oid, chunk_size)?;
1764 let ref_update = git_ref_update_message(local_state, target_thread, commit_oid, &checkpoint)?;
1765
1766 Ok(GitLanePushPlan {
1767 local_revision_address: git_revision_address(&commit_oid),
1768 pack,
1769 ref_update,
1770 })
1771}
1772
1773fn build_git_lane_pack_plan(
1774 git_repo: &SleyRepository,
1775 root: GitObjectId,
1776 chunk_size: usize,
1777) -> Result<GitPackPushPlan, ProtocolError> {
1778 let objects = git_repo.objects();
1779 let mut sink = io::sink();
1780 let pack = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
1781 objects.as_ref(),
1782 git_repo.object_format(),
1783 [root],
1784 &HashSet::new(),
1785 &mut sink,
1786 )
1787 .map_err(|err| {
1788 ProtocolError::InvalidState(format!(
1789 "plan reachable Git pack stream for {}: {err}",
1790 root.to_hex()
1791 ))
1792 })?
1793 .ok_or_else(|| {
1794 ProtocolError::InvalidState(format!(
1795 "checkpoint {} did not produce a reachable Git pack",
1796 root.to_hex()
1797 ))
1798 })?;
1799 if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
1800 return Err(ProtocolError::InvalidState(format!(
1801 "Git pack exceeds maximum transfer size of {} bytes",
1802 wire::MAX_RECEIVED_GIT_PACK_SIZE
1803 )));
1804 }
1805 let chunk_size = chunk_size.max(1) as u64;
1806 let chunk_count = pack.pack_size.div_ceil(chunk_size);
1807 if chunk_count > u32::MAX as u64 {
1808 return Err(ProtocolError::InvalidState(
1809 "Git pack chunk count exceeds u32".to_string(),
1810 ));
1811 }
1812 Ok(GitPackPushPlan {
1813 transfer_id: format!("git-pack:{}", root.to_hex()),
1814 pack_id: pack.checksum.as_bytes().to_vec(),
1815 pack_size: pack.pack_size,
1816 root,
1817 git_repo: git_repo.clone(),
1818 })
1819}
1820
1821async fn send_git_pack_streaming_messages(
1822 tx: &mpsc::Sender<PushMessage>,
1823 pack: &GitPackPushPlan,
1824 chunk_size: usize,
1825) -> Result<(), ProtocolError> {
1826 let tx = tx.clone();
1827 let pack = pack.clone();
1828 tokio::task::spawn_blocking(move || stream_git_pack_messages_blocking(tx, pack, chunk_size))
1829 .await
1830 .map_err(|err| {
1831 ProtocolError::InvalidState(format!("Git pack streaming task failed: {err}"))
1832 })?
1833}
1834
1835fn stream_git_pack_messages_blocking(
1836 tx: mpsc::Sender<PushMessage>,
1837 pack: GitPackPushPlan,
1838 chunk_size: usize,
1839) -> Result<(), ProtocolError> {
1840 let objects = pack.git_repo.objects();
1841 let mut writer = GitPackPushMessageWriter::new(
1842 tx,
1843 pack.transfer_id.clone(),
1844 pack.pack_id.clone(),
1845 pack.pack_size,
1846 chunk_size,
1847 );
1848 let summary = sley::plumbing::sley_odb::write_reachable_pack_to_writer(
1849 objects.as_ref(),
1850 pack.git_repo.object_format(),
1851 [pack.root],
1852 &HashSet::new(),
1853 &mut writer,
1854 )
1855 .map_err(|err| {
1856 ProtocolError::InvalidState(format!(
1857 "stream reachable Git pack for {}: {err}",
1858 pack.root.to_hex()
1859 ))
1860 })?
1861 .ok_or_else(|| {
1862 ProtocolError::InvalidState(format!(
1863 "checkpoint {} did not produce a reachable Git pack",
1864 pack.root.to_hex()
1865 ))
1866 })?;
1867 writer.finish()?;
1868 if summary.pack_size != pack.pack_size || summary.checksum.as_bytes() != pack.pack_id.as_slice()
1869 {
1870 return Err(ProtocolError::InvalidState(format!(
1871 "Git pack stream changed while sending {}; expected {} bytes/{}, got {} bytes/{}",
1872 pack.root.to_hex(),
1873 pack.pack_size,
1874 hex::encode(&pack.pack_id),
1875 summary.pack_size,
1876 summary.checksum.to_hex()
1877 )));
1878 }
1879 Ok(())
1880}
1881
1882struct GitPackPushMessageWriter {
1883 tx: mpsc::Sender<PushMessage>,
1884 transfer_id: String,
1885 pack_id: Vec<u8>,
1886 pack_size: u64,
1887 chunk_size: usize,
1888 buffer: Vec<u8>,
1889 offset: u64,
1890 chunk_index: u32,
1891}
1892
1893impl GitPackPushMessageWriter {
1894 fn new(
1895 tx: mpsc::Sender<PushMessage>,
1896 transfer_id: String,
1897 pack_id: Vec<u8>,
1898 pack_size: u64,
1899 chunk_size: usize,
1900 ) -> Self {
1901 let chunk_size = chunk_size.max(1);
1902 Self {
1903 tx,
1904 transfer_id,
1905 pack_id,
1906 pack_size,
1907 chunk_size,
1908 buffer: Vec::with_capacity(chunk_size),
1909 offset: 0,
1910 chunk_index: 0,
1911 }
1912 }
1913
1914 fn send_buffer(&mut self) -> io::Result<()> {
1915 if self.buffer.is_empty() {
1916 return Ok(());
1917 }
1918 let chunk = std::mem::take(&mut self.buffer);
1919 let next_offset = self.offset.checked_add(chunk.len() as u64).ok_or_else(|| {
1920 io::Error::new(io::ErrorKind::InvalidData, "Git pack offset overflow")
1921 })?;
1922 if next_offset > self.pack_size {
1923 return Err(io::Error::new(
1924 io::ErrorKind::InvalidData,
1925 format!(
1926 "Git pack stream exceeded planned size {}; got at least {}",
1927 self.pack_size, next_offset
1928 ),
1929 ));
1930 }
1931 let is_final_chunk = next_offset == self.pack_size;
1932 self.tx
1933 .blocking_send(git_lane_push_message(git_lane_transfer::Body::Pack(
1934 GitPackTransfer {
1935 transfer_id: self.transfer_id.clone(),
1936 offset: self.offset,
1937 chunk_index: self.chunk_index,
1938 is_final_chunk,
1939 pack_size: self.pack_size,
1940 pack_chunk: chunk.into(),
1941 pack_id: self.pack_id.clone().into(),
1942 },
1943 )))
1944 .map_err(|_| {
1945 io::Error::new(io::ErrorKind::BrokenPipe, "push stream closed unexpectedly")
1946 })?;
1947 self.offset = next_offset;
1948 self.chunk_index = self.chunk_index.checked_add(1).ok_or_else(|| {
1949 io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk index overflow")
1950 })?;
1951 Ok(())
1952 }
1953
1954 fn finish(mut self) -> Result<(), ProtocolError> {
1955 self.send_buffer().map_err(|err| {
1956 ProtocolError::InvalidState(format!("send final Git pack chunk: {err}"))
1957 })?;
1958 if self.offset != self.pack_size {
1959 return Err(ProtocolError::InvalidState(format!(
1960 "Git pack stream length mismatch: expected {}, got {}",
1961 self.pack_size, self.offset
1962 )));
1963 }
1964 Ok(())
1965 }
1966}
1967
1968impl Write for GitPackPushMessageWriter {
1969 fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
1970 let written = buf.len();
1971 while !buf.is_empty() {
1972 if self.offset == self.pack_size {
1973 return Err(io::Error::new(
1974 io::ErrorKind::InvalidData,
1975 format!("Git pack stream wrote past planned size {}", self.pack_size),
1976 ));
1977 }
1978 let capacity = self
1979 .chunk_size
1980 .checked_sub(self.buffer.len())
1981 .ok_or_else(|| {
1982 io::Error::new(io::ErrorKind::InvalidData, "Git pack chunk buffer overflow")
1983 })?;
1984 let take = capacity.min(buf.len());
1985 self.buffer.extend_from_slice(&buf[..take]);
1986 buf = &buf[take..];
1987 if self.buffer.len() == self.chunk_size {
1988 self.send_buffer()?;
1989 }
1990 }
1991 Ok(written)
1992 }
1993
1994 fn flush(&mut self) -> io::Result<()> {
1995 Ok(())
1996 }
1997}
1998
1999fn git_ref_update_message(
2000 local_state: ChangeId,
2001 target_thread: &str,
2002 commit_oid: GitObjectId,
2003 checkpoint: &GitCheckpointRecord,
2004) -> Result<PushMessage, ProtocolError> {
2005 let metadata_json = serde_json::json!({
2006 "source": "heddle-checkpoint",
2007 "summary": checkpoint.summary,
2008 "committed_at": checkpoint.committed_at,
2009 })
2010 .to_string();
2011 Ok(git_lane_push_message(git_lane_transfer::Body::RefUpdate(
2012 GitRefUpdateTransfer {
2013 name: format!("refs/heads/{target_thread}"),
2014 kind: GitRefKind::Branch as i32,
2015 target_oid: commit_oid.as_bytes().to_vec().into(),
2016 peeled_oid: Vec::new().into(),
2017 expected_missing: false,
2018 expected_target_oid: Vec::new().into(),
2019 checkpoint: Some(GitCheckpointTransfer {
2020 heddle_change_id: local_state.as_bytes().to_vec().into(),
2021 git_commit_oid: commit_oid.as_bytes().to_vec().into(),
2022 thread: target_thread.to_string(),
2023 metadata_json,
2024 }),
2025 },
2026 )))
2027}
2028
2029fn apply_git_ref_expectation(
2030 message: &mut PushMessage,
2031 remote_revision_address: &str,
2032) -> Result<(), ProtocolError> {
2033 let expectation = parse_git_ref_expectation(remote_revision_address)?;
2034 let Some(push_message::Body::GitLane(GitLaneTransfer {
2035 body: Some(git_lane_transfer::Body::RefUpdate(update)),
2036 })) = message.body.as_mut()
2037 else {
2038 return Err(ProtocolError::InvalidState(
2039 "Git lane push plan missing ref update message".to_string(),
2040 ));
2041 };
2042 match &expectation {
2043 GitRefRemoteExpectation::Missing => {
2044 update.expected_missing = true;
2045 update.expected_target_oid = Vec::new().into();
2046 }
2047 GitRefRemoteExpectation::Value(oid) => {
2048 update.expected_missing = false;
2049 update.expected_target_oid = oid.clone().into();
2050 }
2051 }
2052 Ok(())
2053}
2054
2055#[derive(Clone, Debug, PartialEq, Eq)]
2056enum GitRefRemoteExpectation {
2057 Missing,
2058 Value(Vec<u8>),
2059}
2060
2061fn parse_git_ref_expectation(
2062 remote_revision_address: &str,
2063) -> Result<GitRefRemoteExpectation, ProtocolError> {
2064 if remote_revision_address.is_empty() || remote_revision_address.starts_with("heddle:") {
2065 return Ok(GitRefRemoteExpectation::Missing);
2066 }
2067 let Some(hex_oid) = remote_revision_address.strip_prefix("git:") else {
2068 return Err(ProtocolError::InvalidState(format!(
2069 "server returned unsupported remote_revision_address {remote_revision_address:?}"
2070 )));
2071 };
2072 let oid = hex::decode(hex_oid).map_err(|err| {
2073 ProtocolError::InvalidState(format!(
2074 "server returned invalid Git remote_revision_address: {err}"
2075 ))
2076 })?;
2077 match oid.len() {
2078 20 | 32 => Ok(GitRefRemoteExpectation::Value(oid)),
2079 len => Err(ProtocolError::InvalidState(format!(
2080 "server returned Git remote_revision_address with {len} bytes; expected SHA-1 or SHA-256"
2081 ))),
2082 }
2083}
2084
2085fn git_lane_push_message(body: git_lane_transfer::Body) -> PushMessage {
2086 PushMessage {
2087 body: Some(push_message::Body::GitLane(GitLaneTransfer {
2088 body: Some(body),
2089 })),
2090 }
2091}
2092
2093fn git_lane_transfer_size(transfer: &GitLaneTransfer) -> usize {
2094 match transfer.body.as_ref() {
2095 Some(git_lane_transfer::Body::Pack(pack)) => pack.pack_chunk.len(),
2096 Some(git_lane_transfer::Body::RefUpdate(update)) => update
2097 .target_oid
2098 .len()
2099 .saturating_add(update.peeled_oid.len())
2100 .saturating_add(update.expected_target_oid.len())
2101 .saturating_add(
2102 update
2103 .checkpoint
2104 .as_ref()
2105 .map(git_checkpoint_transfer_size)
2106 .unwrap_or_default(),
2107 ),
2108 Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2109 git_checkpoint_transfer_size(checkpoint)
2110 }
2111 None => 0,
2112 }
2113}
2114
2115fn git_checkpoint_transfer_size(checkpoint: &GitCheckpointTransfer) -> usize {
2116 checkpoint
2117 .heddle_change_id
2118 .len()
2119 .saturating_add(checkpoint.git_commit_oid.len())
2120 .saturating_add(checkpoint.thread.len())
2121 .saturating_add(checkpoint.metadata_json.len())
2122}
2123
2124#[derive(Default)]
2125struct GitPackPullInstallState {
2126 active: Option<GitPackPullInstall>,
2127}
2128
2129impl GitPackPullInstallState {
2130 fn ensure_idle(&self) -> Result<(), ProtocolError> {
2131 if self.active.is_none() {
2132 Ok(())
2133 } else {
2134 Err(ProtocolError::InvalidState(
2135 "Git pack transfer ended before final chunk".to_string(),
2136 ))
2137 }
2138 }
2139
2140 fn receive_chunk(
2141 &mut self,
2142 git_repo: &SleyRepository,
2143 pack: GitPackTransfer,
2144 ) -> Result<(), ProtocolError> {
2145 if pack.transfer_id.is_empty() {
2146 return Err(ProtocolError::InvalidState(
2147 "Git pack transfer_id is required".to_string(),
2148 ));
2149 }
2150 if pack.pack_size > wire::MAX_RECEIVED_GIT_PACK_SIZE {
2151 return Err(ProtocolError::InvalidState(format!(
2152 "Git pack exceeds maximum transfer size of {} bytes",
2153 wire::MAX_RECEIVED_GIT_PACK_SIZE
2154 )));
2155 }
2156 if pack.pack_chunk.is_empty() {
2157 return Err(ProtocolError::InvalidState(
2158 "Git pack chunk must not be empty".to_string(),
2159 ));
2160 }
2161 let pack_id =
2162 GitObjectId::from_raw(git_repo.object_format(), &pack.pack_id).map_err(|err| {
2163 ProtocolError::InvalidState(format!("GitPackTransfer.pack_id: {err}"))
2164 })?;
2165 if self.active.is_none() {
2166 if pack.offset != 0 {
2167 return Err(ProtocolError::InvalidState(format!(
2168 "Git pack offset mismatch: expected 0, got {}",
2169 pack.offset
2170 )));
2171 }
2172 if pack.chunk_index != 0 {
2173 return Err(ProtocolError::InvalidState(format!(
2174 "Git pack chunk index mismatch: expected 0, got {}",
2175 pack.chunk_index
2176 )));
2177 }
2178 let writer = git_repo
2179 .objects()
2180 .begin_raw_pack_install(pack_id, pack.pack_size)
2181 .map_err(|err| {
2182 ProtocolError::InvalidState(format!("begin Git pack install: {err}"))
2183 })?;
2184 self.active = Some(GitPackPullInstall {
2185 transfer_id: pack.transfer_id.clone(),
2186 pack_id,
2187 pack_size: pack.pack_size,
2188 next_offset: 0,
2189 next_chunk_index: 0,
2190 writer,
2191 });
2192 }
2193
2194 let active = self.active.as_mut().ok_or_else(|| {
2195 ProtocolError::InvalidState("Git pack install not active".to_string())
2196 })?;
2197 active.receive_chunk(&pack, pack_id)?;
2198 if pack.is_final_chunk {
2199 let active = self.active.take().ok_or_else(|| {
2200 ProtocolError::InvalidState("Git pack install not active".to_string())
2201 })?;
2202 active.finish()?;
2203 git_repo.refresh_objects();
2204 }
2205 Ok(())
2206 }
2207}
2208
2209struct GitPackPullInstall {
2210 transfer_id: String,
2211 pack_id: GitObjectId,
2212 pack_size: u64,
2213 next_offset: u64,
2214 next_chunk_index: u32,
2215 writer: sley::plumbing::sley_odb::RawPackStreamingInstall,
2216}
2217
2218impl GitPackPullInstall {
2219 fn receive_chunk(
2220 &mut self,
2221 pack: &GitPackTransfer,
2222 pack_id: GitObjectId,
2223 ) -> Result<(), ProtocolError> {
2224 if self.transfer_id != pack.transfer_id {
2225 return Err(ProtocolError::InvalidState(format!(
2226 "Git pack transfer id changed from {:?} to {:?}",
2227 self.transfer_id, pack.transfer_id
2228 )));
2229 }
2230 if self.pack_id != pack_id {
2231 return Err(ProtocolError::InvalidState(
2232 "Git pack id changed during transfer".to_string(),
2233 ));
2234 }
2235 if self.pack_size != pack.pack_size {
2236 return Err(ProtocolError::InvalidState(
2237 "Git pack size changed during transfer".to_string(),
2238 ));
2239 }
2240 if pack.offset != self.next_offset {
2241 return Err(ProtocolError::InvalidState(format!(
2242 "Git pack offset mismatch: expected {}, got {}",
2243 self.next_offset, pack.offset
2244 )));
2245 }
2246 if pack.chunk_index != self.next_chunk_index {
2247 return Err(ProtocolError::InvalidState(format!(
2248 "Git pack chunk index mismatch: expected {}, got {}",
2249 self.next_chunk_index, pack.chunk_index
2250 )));
2251 }
2252 let chunk_len = u64::try_from(pack.pack_chunk.len()).map_err(|_| {
2253 ProtocolError::InvalidState("Git pack chunk length exceeds u64".to_string())
2254 })?;
2255 let next_offset = self
2256 .next_offset
2257 .checked_add(chunk_len)
2258 .ok_or_else(|| ProtocolError::InvalidState("Git pack offset overflow".to_string()))?;
2259 if next_offset > self.pack_size {
2260 return Err(ProtocolError::InvalidState(
2261 "Git pack chunk exceeds declared pack size".to_string(),
2262 ));
2263 }
2264 self.writer.write_all(&pack.pack_chunk).map_err(|err| {
2265 ProtocolError::InvalidState(format!("write streamed Git pack chunk: {err}"))
2266 })?;
2267 self.next_offset = next_offset;
2268 self.next_chunk_index = self.next_chunk_index.checked_add(1).ok_or_else(|| {
2269 ProtocolError::InvalidState("Git pack chunk index overflow".to_string())
2270 })?;
2271 if pack.is_final_chunk {
2272 if self.next_offset != self.pack_size {
2273 return Err(ProtocolError::InvalidState(format!(
2274 "Git pack final size mismatch: declared {}, received {}",
2275 self.pack_size, self.next_offset
2276 )));
2277 }
2278 } else if self.next_offset == self.pack_size {
2279 return Err(ProtocolError::InvalidState(
2280 "Git pack reached declared size without final chunk marker".to_string(),
2281 ));
2282 }
2283 Ok(())
2284 }
2285
2286 fn finish(self) -> Result<(), ProtocolError> {
2287 self.writer
2288 .finish()
2289 .map_err(|err| ProtocolError::InvalidState(format!("install Git pack: {err}")))?;
2290 Ok(())
2291 }
2292}
2293
2294fn accept_git_lane_pull_transfer(
2295 repo: &Repository,
2296 git_repo: &mut Option<SleyRepository>,
2297 git_pack_state: &mut GitPackPullInstallState,
2298 transfer: GitLaneTransfer,
2299) -> Result<(), ProtocolError> {
2300 if repo.capability() != RepositoryCapability::GitOverlay {
2301 return Err(ProtocolError::InvalidState(format!(
2302 "received git-lane pull transfer for non-GitOverlay repository (capability {:?})",
2303 repo.capability()
2304 )));
2305 }
2306 match transfer.body {
2307 Some(git_lane_transfer::Body::Pack(pack)) => {
2308 accept_git_lane_pack(repo, git_repo, git_pack_state, pack)
2309 }
2310 Some(git_lane_transfer::Body::RefUpdate(update)) => {
2311 accept_git_lane_ref_update(repo, git_repo, update)
2312 }
2313 Some(git_lane_transfer::Body::Checkpoint(checkpoint)) => {
2314 record_git_lane_checkpoint(repo, git_lane_sley_repository(repo, git_repo)?, checkpoint)
2315 }
2316 None => Err(ProtocolError::InvalidState(
2317 "GitLaneTransfer body is required".to_string(),
2318 )),
2319 }
2320}
2321
2322fn accept_git_lane_pack(
2323 repo: &Repository,
2324 git_repo: &mut Option<SleyRepository>,
2325 git_pack_state: &mut GitPackPullInstallState,
2326 pack: GitPackTransfer,
2327) -> Result<(), ProtocolError> {
2328 let git_repo = git_lane_sley_repository(repo, git_repo)?;
2329 git_pack_state.receive_chunk(git_repo, pack)?;
2330 Ok(())
2331}
2332
2333fn accept_git_lane_ref_update(
2334 repo: &Repository,
2335 git_repo: &mut Option<SleyRepository>,
2336 update: GitRefUpdateTransfer,
2337) -> Result<(), ProtocolError> {
2338 let git_repo = git_lane_sley_repository(repo, git_repo)?;
2339 let target = git_oid_from_bytes(
2340 git_repo,
2341 "GitRefUpdateTransfer.target_oid",
2342 &update.target_oid,
2343 )?;
2344 git_repo.read_commit(&target).map_err(|err| {
2345 ProtocolError::InvalidState(format!(
2346 "Git ref {} target commit {} is not present after pack receive: {err}",
2347 update.name,
2348 target.to_hex()
2349 ))
2350 })?;
2351 let refs = git_repo.references();
2352 let mut tx = refs.transaction();
2353 tx.update_to(
2354 update.name.clone(),
2355 ReferenceTarget::Direct(target),
2356 RefPrecondition::Any,
2357 None,
2358 );
2359 tx.commit().map_err(|err| {
2360 ProtocolError::InvalidState(format!("update Git ref {}: {err}", update.name))
2361 })?;
2362 if let Some(checkpoint) = update.checkpoint {
2363 record_git_lane_checkpoint(repo, git_repo, checkpoint)?;
2364 }
2365 Ok(())
2366}
2367
2368fn record_git_lane_checkpoint(
2369 repo: &Repository,
2370 git_repo: &SleyRepository,
2371 checkpoint: GitCheckpointTransfer,
2372) -> Result<(), ProtocolError> {
2373 let state = ChangeId::try_from_slice(&checkpoint.heddle_change_id)
2374 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2375 let commit_oid = git_oid_from_bytes(
2376 git_repo,
2377 "GitCheckpointTransfer.git_commit_oid",
2378 &checkpoint.git_commit_oid,
2379 )?;
2380 let commit_hex = commit_oid.to_hex();
2381 if repo
2382 .latest_git_checkpoint_for_change(&state)
2383 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2384 .is_some_and(|record| record.git_commit == commit_hex)
2385 {
2386 return Ok(());
2387 }
2388 let summary = serde_json::from_str::<serde_json::Value>(&checkpoint.metadata_json)
2389 .ok()
2390 .and_then(|metadata| {
2391 metadata
2392 .get("summary")
2393 .and_then(serde_json::Value::as_str)
2394 .map(str::to_string)
2395 })
2396 .unwrap_or_else(|| "pulled Git checkpoint".to_string());
2397 repo.record_git_checkpoint(&state, commit_hex, summary)
2398 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
2399 Ok(())
2400}
2401
2402fn git_lane_sley_repository<'a>(
2403 repo: &Repository,
2404 git_repo: &'a mut Option<SleyRepository>,
2405) -> Result<&'a SleyRepository, ProtocolError> {
2406 if git_repo.is_none() {
2407 *git_repo = Some(
2408 repo.git_overlay_sley_repository()
2409 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?
2410 .ok_or_else(|| {
2411 ProtocolError::InvalidState(
2412 "git-overlay repository has no Git store".to_string(),
2413 )
2414 })?,
2415 );
2416 }
2417 match git_repo.as_ref() {
2418 Some(git_repo) => Ok(git_repo),
2419 None => Err(ProtocolError::InvalidState(
2420 "git-overlay repository has no Git store".to_string(),
2421 )),
2422 }
2423}
2424
2425fn git_oid_from_bytes(
2426 git_repo: &SleyRepository,
2427 field: &str,
2428 bytes: &[u8],
2429) -> Result<GitObjectId, ProtocolError> {
2430 GitObjectId::from_raw(git_repo.object_format(), bytes)
2431 .map_err(|err| ProtocolError::InvalidState(format!("{field}: {err}")))
2432}
2433
2434async fn send_native_pack_streaming_messages(
2435 tx: &mpsc::Sender<PushMessage>,
2436 repo: &Repository,
2437 objects: &[ObjectInfo],
2438 transfer_id: &str,
2439 chunk_size: usize,
2440 transport: &super::helpers::HostedTransportPolicy,
2441 transport_mode: TransportMode,
2442) -> Result<(), ProtocolError> {
2443 let object_count = u64::try_from(objects.len()).map_err(|_| {
2444 ProtocolError::InvalidState("native pack object count exceeds u64".to_string())
2445 })?;
2446 let mut writer = wire::NativePackStreamingWriter::new_in(repo.heddle_dir(), object_count)?;
2447 let mut pack_reader = wire::GrowingPackChunkReader::open(writer.pack_path(), chunk_size)?;
2448 let (loaded_tx, mut loaded_rx) = mpsc::channel::<(
2449 usize,
2450 Result<wire::ObjectData, ProtocolError>,
2451 )>(NATIVE_PACK_OBJECT_PREFETCH_LIMIT);
2452 let store = repo.store().clone();
2453 let object_plan = objects.to_vec();
2454 let loader = tokio::task::spawn_blocking(move || {
2455 load_native_pack_objects_parallel(store, object_plan, loaded_tx);
2456 });
2457
2458 let mut next_index = 0usize;
2459 let mut pending = BTreeMap::new();
2460 while next_index < objects.len() {
2461 let (index, object) = loaded_rx.recv().await.ok_or_else(|| {
2462 ProtocolError::InvalidState(
2463 "native pack object loader stopped before sending all objects".to_string(),
2464 )
2465 })?;
2466 pending.insert(index, object);
2467
2468 while let Some(object) = pending.remove(&next_index) {
2469 let object = object?;
2470 let should_drain = object.data.len() >= chunk_size
2471 || (next_index + 1).is_multiple_of(NATIVE_PACK_DRAIN_OBJECT_INTERVAL);
2472 writer.add_object_data(object)?;
2473 if should_drain {
2474 writer.flush_pack()?;
2475 drain_growing_native_pack_stream(
2476 tx,
2477 &mut pack_reader,
2478 false,
2479 PackStreamKind::Pack,
2480 transfer_id,
2481 transport,
2482 transport_mode,
2483 )
2484 .await?;
2485 }
2486 next_index += 1;
2487 }
2488 }
2489 loader.await.map_err(|err| {
2490 ProtocolError::InvalidState(format!("native pack object loader task failed: {err}"))
2491 })?;
2492
2493 let bundle = writer.finish()?;
2494 drain_growing_native_pack_stream(
2495 tx,
2496 &mut pack_reader,
2497 true,
2498 PackStreamKind::Pack,
2499 transfer_id,
2500 transport,
2501 transport_mode,
2502 )
2503 .await?;
2504 send_native_pack_file_stream(
2505 tx,
2506 &bundle.index_path,
2507 PackStreamKind::Index,
2508 transfer_id,
2509 chunk_size,
2510 transport,
2511 transport_mode,
2512 )
2513 .await
2514}
2515
2516fn load_native_pack_objects_parallel(
2517 store: AnyStore,
2518 objects: Vec<ObjectInfo>,
2519 tx: mpsc::Sender<(usize, Result<wire::ObjectData, ProtocolError>)>,
2520) {
2521 if objects.is_empty() {
2522 return;
2523 }
2524 let worker_count = native_pack_object_load_worker_count(objects.len());
2525 let objects = Arc::new(objects);
2526 let next_index = Arc::new(AtomicUsize::new(0));
2527
2528 std::thread::scope(|scope| {
2529 for _ in 0..worker_count {
2530 let store = store.clone();
2531 let objects = Arc::clone(&objects);
2532 let next_index = Arc::clone(&next_index);
2533 let tx = tx.clone();
2534 scope.spawn(move || {
2535 loop {
2536 let index = next_index.fetch_add(1, Ordering::Relaxed);
2537 let Some(info) = objects.get(index) else {
2538 break;
2539 };
2540 let object = wire::load_object_data(&store, &info.id, info.obj_type);
2541 if tx.blocking_send((index, object)).is_err() {
2542 break;
2543 }
2544 }
2545 });
2546 }
2547 });
2548}
2549
2550fn native_pack_object_load_worker_count(object_count: usize) -> usize {
2551 let available = std::thread::available_parallelism()
2552 .map(usize::from)
2553 .unwrap_or(1);
2554 object_count
2555 .min(available)
2556 .clamp(1, NATIVE_PACK_OBJECT_LOAD_WORKER_LIMIT)
2557}
2558
2559async fn drain_growing_native_pack_stream(
2560 tx: &mpsc::Sender<PushMessage>,
2561 reader: &mut wire::GrowingPackChunkReader,
2562 final_stream: bool,
2563 stream_kind: PackStreamKind,
2564 transfer_id: &str,
2565 transport: &super::helpers::HostedTransportPolicy,
2566 transport_mode: TransportMode,
2567) -> Result<(), ProtocolError> {
2568 while let Some((offset, chunk_index, data, is_final_chunk)) =
2569 reader.next_available_chunk(final_stream)?
2570 {
2571 send_pack_chunk(
2572 tx,
2573 stream_kind,
2574 data,
2575 transfer_id,
2576 transport,
2577 transport_mode,
2578 chunk_index,
2579 offset,
2580 is_final_chunk,
2581 )
2582 .await?;
2583 }
2584 Ok(())
2585}
2586
2587async fn send_native_pack_file_stream(
2588 tx: &mpsc::Sender<PushMessage>,
2589 path: &std::path::Path,
2590 stream_kind: PackStreamKind,
2591 transfer_id: &str,
2592 chunk_size: usize,
2593 transport: &super::helpers::HostedTransportPolicy,
2594 transport_mode: TransportMode,
2595) -> Result<(), ProtocolError> {
2596 let mut reader = wire::PackFileChunkReader::open(path, chunk_size)?;
2597 while let Some((offset, chunk_index, data, is_final_chunk)) = reader.next_chunk()? {
2598 send_pack_chunk(
2599 tx,
2600 stream_kind,
2601 data,
2602 transfer_id,
2603 transport,
2604 transport_mode,
2605 chunk_index,
2606 offset,
2607 is_final_chunk,
2608 )
2609 .await?;
2610 }
2611 Ok(())
2612}
2613
2614#[allow(clippy::too_many_arguments)]
2615async fn send_pack_chunk(
2616 tx: &mpsc::Sender<PushMessage>,
2617 stream_kind: PackStreamKind,
2618 data: Vec<u8>,
2619 transfer_id: &str,
2620 transport: &super::helpers::HostedTransportPolicy,
2621 transport_mode: TransportMode,
2622 chunk_index: u32,
2623 offset: u64,
2624 is_final_chunk: bool,
2625) -> Result<(), ProtocolError> {
2626 let chunk_length = data.len().min(u32::MAX as usize) as u32;
2627 tx.send(PushMessage {
2628 body: Some(push_message::Body::Pack(PackChunk {
2629 stream_kind: stream_kind as i32,
2630 data: data.into(),
2631 transfer: Some(transport.transfer_checkpoint_with_mode(
2632 transfer_id,
2633 transport_mode,
2634 chunk_index,
2635 offset,
2636 is_final_chunk,
2637 )),
2638 chunk_length,
2639 is_final_chunk,
2640 })),
2641 })
2642 .await
2643 .map_err(|_| ProtocolError::InvalidState("push stream closed unexpectedly".to_string()))
2644}
2645
2646fn preferred_transport_mode(
2647 transport: &super::helpers::HostedTransportPolicy,
2648 object_count: usize,
2649) -> TransportMode {
2650 let _ = transport;
2651 let _ = object_count;
2652 TransportMode::NativePack
2653}
2654
2655#[cfg(test)]
2656mod tests {
2657 use chrono::{TimeZone, Utc};
2658 use cli_shared::ClientConfig;
2659 use grpc::heddle::v1::{
2660 ListRefsRequest, ListRefsResponse, PullComplete as GrpcPullComplete, PullReady,
2661 TransferCheckpoint, UpdateRefRequest, UpdateRefResponse, push_message,
2662 repo_sync_service_server::{RepoSyncService, RepoSyncServiceServer},
2663 };
2664 use objects::{
2665 object::{
2666 Attribution, Blob, ChangeId, ContentHash, Principal, Redaction, State, StateVisibility,
2667 StateVisibilityBlob, Tree, TreeEntry, VisibilityTier,
2668 },
2669 store::ObjectStore,
2670 };
2671 use tempfile::TempDir;
2672 use tonic::{Response, Status, transport::Server};
2673 use wire::{ObjectId, ObjectInfo};
2674
2675 use super::*;
2676 use crate::grpc_hosted::helpers::{descriptor_id_from_info, to_proto_object_info};
2677
2678 fn temp_repo() -> (TempDir, Repository) {
2679 let dir = TempDir::new().expect("tempdir");
2680 let repo = Repository::init_default(dir.path()).expect("init repo");
2681 (dir, repo)
2682 }
2683
2684 fn temp_repo_unseeded() -> (TempDir, Repository) {
2688 let dir = TempDir::new().expect("tempdir");
2689 let repo = Repository::init(dir.path()).expect("init repo");
2690 (dir, repo)
2691 }
2692
2693 fn redaction_info(blob: ContentHash) -> ObjectInfo {
2694 ObjectInfo {
2695 id: ObjectId::Hash(blob),
2696 obj_type: ObjectType::Redaction,
2697 size: 0,
2698 delta_base: None,
2699 }
2700 }
2701
2702 fn state_info(state: ChangeId) -> ObjectInfo {
2703 ObjectInfo {
2704 id: ObjectId::ChangeId(state),
2705 obj_type: ObjectType::State,
2706 size: 0,
2707 delta_base: None,
2708 }
2709 }
2710
2711 fn state_visibility_info(state: ChangeId) -> ObjectInfo {
2712 ObjectInfo {
2713 id: ObjectId::ChangeId(state),
2714 obj_type: ObjectType::StateVisibility,
2715 size: 0,
2716 delta_base: None,
2717 }
2718 }
2719
2720 fn sample_blob() -> ContentHash {
2721 ContentHash::from_bytes([7u8; 32])
2722 }
2723
2724 #[test]
2725 fn descriptor_id_from_info_matches_proto_encode_path() {
2726 let infos = [
2727 redaction_info(sample_blob()),
2728 state_info(ChangeId::from_bytes([3u8; 16])),
2729 state_visibility_info(ChangeId::from_bytes([9u8; 16])),
2730 ];
2731 for info in infos {
2732 assert_eq!(
2733 descriptor_id_from_info(&info),
2734 descriptor_id(&to_proto_object_info(&info)),
2735 "keying path must stay byte-identical to the throwaway-encode path",
2736 );
2737 }
2738 }
2739
2740 #[test]
2741 fn git_lane_messages_pack_reachable_commit_graph_and_attach_checkpoint() {
2742 let dir = TempDir::new().expect("tempdir");
2743 let git = sley::Repository::init(dir.path()).expect("init git");
2744 let blob_oid = git.write_blob(b"hello\n").expect("write blob");
2745 let tree = sley::TreeObject {
2746 entries: vec![sley::plumbing::sley_object::TreeEntry {
2747 mode: 0o100644,
2748 name: sley::BString::from(b"hello.txt"),
2749 oid: blob_oid,
2750 }],
2751 };
2752 let tree_oid = git
2753 .write_raw_object(sley::GitObjectType::Tree, tree.write())
2754 .expect("write tree");
2755 let commit = sley::CommitObject {
2756 tree: tree_oid,
2757 parents: Vec::new(),
2758 author: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2759 committer: b"Tester <test@example.com> 1700000000 +0000".to_vec(),
2760 encoding: None,
2761 message: b"checkpoint\n".to_vec(),
2762 };
2763 let commit_oid = git
2764 .write_raw_object(sley::GitObjectType::Commit, commit.write())
2765 .expect("write commit");
2766
2767 let pack = build_git_lane_pack_plan(&git, commit_oid, 64 * 1024)
2768 .expect("build git lane pack plan");
2769 assert_eq!(pack.pack_id.len(), git.object_format().raw_len());
2770 let (tx, mut rx) = mpsc::channel(8);
2771 stream_git_pack_messages_blocking(tx, pack.clone(), 64 * 1024)
2772 .expect("stream git lane pack");
2773 let mut pack_bytes = Vec::new();
2774 let mut chunks = Vec::new();
2775 while let Some(chunk) = rx.blocking_recv() {
2776 let Some(push_message::Body::GitLane(GitLaneTransfer {
2777 body: Some(git_lane_transfer::Body::Pack(pack)),
2778 })) = chunk.body
2779 else {
2780 panic!("expected git pack chunk");
2781 };
2782 pack_bytes.extend_from_slice(&pack.pack_chunk);
2783 chunks.push(pack);
2784 }
2785 assert!(!chunks.is_empty());
2786 assert!(chunks.last().expect("pack chunk").is_final_chunk);
2787 assert_eq!(pack_bytes.len() as u64, pack.pack_size);
2788 let indexed = sley::plumbing::sley_odb::index_raw_pack(&pack_bytes, git.object_format())
2789 .expect("generated pack should index");
2790 assert_eq!(indexed.pack_id.as_bytes(), pack.pack_id.as_slice());
2791 assert_eq!(indexed.objects.len(), 3);
2792
2793 let state = ChangeId::from_bytes([9u8; 16]);
2794 let checkpoint = GitCheckpointRecord {
2795 change_id: state.to_string_full(),
2796 git_commit: commit_oid.to_hex(),
2797 summary: "checkpoint".to_string(),
2798 committed_at: "2026-06-25T00:00:00Z".to_string(),
2799 };
2800 let ref_message =
2801 git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2802 let Some(push_message::Body::GitLane(GitLaneTransfer {
2803 body: Some(git_lane_transfer::Body::RefUpdate(update)),
2804 })) = ref_message.body
2805 else {
2806 panic!("expected git ref update message");
2807 };
2808 assert_eq!(update.name, "refs/heads/main");
2809 assert_eq!(update.kind, GitRefKind::Branch as i32);
2810 assert_eq!(update.target_oid.as_ref(), commit_oid.as_bytes());
2811 let checkpoint = update.checkpoint.expect("checkpoint");
2812 assert_eq!(checkpoint.heddle_change_id.as_ref(), state.as_bytes());
2813 assert_eq!(checkpoint.git_commit_oid.as_ref(), commit_oid.as_bytes());
2814 assert_eq!(checkpoint.thread, "main");
2815 }
2816
2817 #[test]
2818 fn git_ref_expectation_marks_missing_when_remote_has_no_git_revision() {
2819 let state = ChangeId::from_bytes([9u8; 16]);
2820 let commit_oid = GitObjectId::from_hex(
2821 sley::ObjectFormat::Sha1,
2822 "0123456789abcdef0123456789abcdef01234567",
2823 )
2824 .expect("oid");
2825 let checkpoint = GitCheckpointRecord {
2826 change_id: state.to_string_full(),
2827 git_commit: commit_oid.to_hex(),
2828 summary: "checkpoint".to_string(),
2829 committed_at: "2026-06-25T00:00:00Z".to_string(),
2830 };
2831 let mut message =
2832 git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2833
2834 apply_git_ref_expectation(&mut message, "").expect("missing expectation");
2835 let update = git_ref_update_from_message(&message);
2836 assert!(update.expected_missing);
2837 assert!(update.expected_target_oid.is_empty());
2838 }
2839
2840 #[test]
2841 fn git_ref_expectation_uses_remote_git_revision_oid() {
2842 let state = ChangeId::from_bytes([9u8; 16]);
2843 let commit_oid = GitObjectId::from_hex(
2844 sley::ObjectFormat::Sha1,
2845 "0123456789abcdef0123456789abcdef01234567",
2846 )
2847 .expect("oid");
2848 let remote_oid = "89abcdef012345670123456789abcdef01234567";
2849 let checkpoint = GitCheckpointRecord {
2850 change_id: state.to_string_full(),
2851 git_commit: commit_oid.to_hex(),
2852 summary: "checkpoint".to_string(),
2853 committed_at: "2026-06-25T00:00:00Z".to_string(),
2854 };
2855 let mut message =
2856 git_ref_update_message(state, "main", commit_oid, &checkpoint).expect("ref update");
2857
2858 apply_git_ref_expectation(&mut message, &format!("git:{remote_oid}"))
2859 .expect("git expectation");
2860 let update = git_ref_update_from_message(&message);
2861 assert!(!update.expected_missing);
2862 assert_eq!(hex::encode(update.expected_target_oid.as_ref()), remote_oid);
2863 }
2864
2865 #[test]
2866 fn git_pack_stream_writer_emits_ordered_chunks() {
2867 let (tx, mut rx) = mpsc::channel(4);
2868 let mut writer =
2869 GitPackPushMessageWriter::new(tx, "git-pack:test".to_string(), vec![0x42; 20], 10, 4);
2870 writer.write_all(b"abcdefghij").expect("write pack bytes");
2871 writer.finish().expect("finish pack stream");
2872
2873 let mut chunks = Vec::new();
2874 while let Some(message) = rx.blocking_recv() {
2875 let Some(push_message::Body::GitLane(GitLaneTransfer {
2876 body: Some(git_lane_transfer::Body::Pack(pack)),
2877 })) = message.body
2878 else {
2879 panic!("expected git pack chunk");
2880 };
2881 chunks.push(pack);
2882 }
2883
2884 assert_eq!(chunks.len(), 3);
2885 assert_eq!(chunks[0].offset, 0);
2886 assert_eq!(chunks[0].chunk_index, 0);
2887 assert!(!chunks[0].is_final_chunk);
2888 assert_eq!(chunks[0].pack_chunk.as_ref(), b"abcd");
2889 assert_eq!(chunks[1].offset, 4);
2890 assert_eq!(chunks[1].chunk_index, 1);
2891 assert!(!chunks[1].is_final_chunk);
2892 assert_eq!(chunks[1].pack_chunk.as_ref(), b"efgh");
2893 assert_eq!(chunks[2].offset, 8);
2894 assert_eq!(chunks[2].chunk_index, 2);
2895 assert!(chunks[2].is_final_chunk);
2896 assert_eq!(chunks[2].pack_chunk.as_ref(), b"ij");
2897 assert!(
2898 chunks
2899 .iter()
2900 .all(|chunk| chunk.pack_id.as_ref() == &[0x42; 20][..])
2901 );
2902 }
2903
2904 #[test]
2905 fn git_pack_pull_install_state_streams_pack_into_sley_store() {
2906 let source_dir = TempDir::new().expect("source tempdir");
2907 let source = sley::Repository::init(source_dir.path()).expect("init source git");
2908 let blob_oid = source.write_blob(b"streamed pull pack\n").expect("blob");
2909 let pack = sley::plumbing::sley_odb::build_reachable_pack(
2910 source.objects().as_ref(),
2911 source.object_format(),
2912 [blob_oid],
2913 &HashSet::new(),
2914 )
2915 .expect("build pack")
2916 .expect("reachable pack");
2917
2918 let dest_dir = TempDir::new().expect("dest tempdir");
2919 let dest = sley::Repository::init(dest_dir.path()).expect("init dest git");
2920 let mut state = GitPackPullInstallState::default();
2921 let chunk_size = 7usize;
2922 let mut offset = 0u64;
2923 for (chunk_index, chunk) in pack.pack.chunks(chunk_size).enumerate() {
2924 let next_offset = offset + chunk.len() as u64;
2925 state
2926 .receive_chunk(
2927 &dest,
2928 GitPackTransfer {
2929 transfer_id: "git-pack:test".to_string(),
2930 offset,
2931 chunk_index: chunk_index as u32,
2932 is_final_chunk: next_offset == pack.pack.len() as u64,
2933 pack_size: pack.pack.len() as u64,
2934 pack_chunk: chunk.to_vec().into(),
2935 pack_id: pack.checksum.as_bytes().to_vec().into(),
2936 },
2937 )
2938 .expect("receive chunk");
2939 offset = next_offset;
2940 }
2941
2942 state.ensure_idle().expect("stream should finish");
2943 let object = dest.read_object(&blob_oid).expect("read installed object");
2944 assert_eq!(object.body.as_slice(), b"streamed pull pack\n");
2945 }
2946
2947 #[test]
2948 fn accept_git_lane_pull_transfer_errors_for_non_overlay_repo() {
2949 let (_dir, repo) = temp_repo();
2950 assert_ne!(
2951 repo.capability(),
2952 RepositoryCapability::GitOverlay,
2953 "init_default repo must be non-GitOverlay for this guard test",
2954 );
2955 let mut git_repo = None;
2956 let mut state = GitPackPullInstallState::default();
2957 let transfer = GitLaneTransfer {
2958 body: Some(git_lane_transfer::Body::Pack(GitPackTransfer {
2959 transfer_id: "git-pack:test".to_string(),
2960 offset: 0,
2961 chunk_index: 0,
2962 is_final_chunk: true,
2963 pack_size: 0,
2964 pack_chunk: Vec::new().into(),
2965 pack_id: Vec::new().into(),
2966 })),
2967 };
2968 let err = accept_git_lane_pull_transfer(&repo, &mut git_repo, &mut state, transfer)
2969 .expect_err("git-lane pull to a non-GitOverlay repo must fail loud, not silently drop");
2970 assert!(
2971 matches!(err, ProtocolError::InvalidState(_)),
2972 "expected InvalidState protocol error, got {err:?}",
2973 );
2974 }
2975
2976 fn git_ref_update_from_message(message: &PushMessage) -> &GitRefUpdateTransfer {
2977 let Some(push_message::Body::GitLane(GitLaneTransfer {
2978 body: Some(git_lane_transfer::Body::RefUpdate(update)),
2979 })) = message.body.as_ref()
2980 else {
2981 panic!("expected git ref update message");
2982 };
2983 update
2984 }
2985
2986 fn loose_tree_path(repo: &Repository, hash: &ContentHash) -> std::path::PathBuf {
2987 let hex = hash.to_hex();
2988 let (prefix, rest) = hex.split_at(2);
2989 repo.heddle_dir()
2990 .join("objects")
2991 .join("trees")
2992 .join(prefix)
2993 .join(rest)
2994 }
2995
2996 fn sample_redaction(blob: ContentHash) -> Redaction {
2997 Redaction {
2998 redacted_blob: blob,
2999 state: ChangeId::from_bytes([1u8; 16]),
3000 path: "config/secrets.toml".into(),
3001 reason: "leaked credential".into(),
3002 redactor: Principal {
3003 name: "Grace Hopper".into(),
3004 email: "grace@example.com".into(),
3005 },
3006 redacted_at: Utc.with_ymd_and_hms(2026, 5, 10, 14, 33, 0).unwrap(),
3007 signature: None,
3008 purged_at: None,
3009 supersedes: None,
3010 }
3011 }
3012
3013 fn sample_state_visibility(state: ChangeId) -> StateVisibility {
3014 StateVisibility {
3015 state,
3016 tier: VisibilityTier::Restricted {
3017 scope_label: "security-embargo".into(),
3018 },
3019 embargo_until: None,
3020 declarer: Principal {
3021 name: "Grace Hopper".into(),
3022 email: "grace@example.com".into(),
3023 },
3024 declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
3025 signature: None,
3026 supersedes: None,
3027 }
3028 }
3029
3030 #[test]
3031 fn non_packable_object_types_are_in_out_of_pack_transfer_partition() {
3032 for obj_type in wire::native_pack_excluded_object_types() {
3033 assert!(
3034 is_out_of_pack_transfer_object_type(*obj_type),
3035 "{obj_type:?} is excluded from native packs but missing from the out-of-pack transfer partition"
3036 );
3037 }
3038 }
3039
3040 #[test]
3041 fn native_pack_required_tracks_packable_pull_wants() {
3042 let blob = sample_blob();
3043 let state = ChangeId::from_bytes([9u8; 16]);
3044
3045 let sidecar_only = HashMap::from([(
3046 PackObjectId::ChangeId(state),
3047 vec![ObjectType::StateVisibility],
3048 )]);
3049 assert!(!native_pack_required_for_pull(false, &sidecar_only));
3050
3051 let redaction_only =
3052 HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Redaction])]);
3053 assert!(!native_pack_required_for_pull(false, &redaction_only));
3054
3055 let packable = HashMap::from([(PackObjectId::Hash(blob), vec![ObjectType::Blob])]);
3056 assert!(native_pack_required_for_pull(false, &packable));
3057
3058 let state_with_sidecar = HashMap::from([(
3059 PackObjectId::ChangeId(state),
3060 vec![ObjectType::State, ObjectType::StateVisibility],
3061 )]);
3062 assert!(native_pack_required_for_pull(false, &state_with_sidecar));
3063 assert!(native_pack_required_for_pull(true, &HashMap::new()));
3064 }
3065
3066 #[test]
3067 fn plan_pull_wants_accumulates_state_and_visibility_for_same_change_id() {
3068 let (_dir, repo) = temp_repo();
3069 let state = ChangeId::from_bytes([9u8; 16]);
3070 let plan = plan_pull_wants(
3071 &repo,
3072 &state,
3073 false,
3074 vec![
3075 object_descriptor_with_status(
3076 &state_info(state),
3077 ObjectAvailabilityStatus::Missing,
3078 "missing state",
3079 ),
3080 object_descriptor_with_status(
3081 &state_visibility_info(state),
3082 ObjectAvailabilityStatus::Missing,
3083 "missing state visibility",
3084 ),
3085 ],
3086 false,
3087 )
3088 .expect("plan pull wants");
3089
3090 let wanted = plan
3091 .wanted_types
3092 .get(&PackObjectId::ChangeId(state))
3093 .expect("same ChangeId want entry");
3094 assert_eq!(
3095 wanted.as_slice(),
3096 &[ObjectType::State, ObjectType::StateVisibility]
3097 );
3098 assert!(native_pack_required_for_pull(
3099 plan.want_full_closure,
3100 &plan.wanted_types
3101 ));
3102 }
3103
3104 #[derive(Clone)]
3105 struct SidecarOnlyPullService {
3106 state: ChangeId,
3107 state_visibility_blob: Vec<u8>,
3108 }
3109
3110 #[tonic::async_trait]
3111 impl RepoSyncService for SidecarOnlyPullService {
3112 async fn list_refs(
3113 &self,
3114 _request: tonic::Request<ListRefsRequest>,
3115 ) -> Result<Response<ListRefsResponse>, Status> {
3116 Ok(Response::new(ListRefsResponse::default()))
3117 }
3118
3119 async fn update_ref(
3120 &self,
3121 _request: tonic::Request<UpdateRefRequest>,
3122 ) -> Result<Response<UpdateRefResponse>, Status> {
3123 Ok(Response::new(UpdateRefResponse::default()))
3124 }
3125
3126 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3127
3128 async fn push(
3129 &self,
3130 _request: tonic::Request<tonic::Streaming<PushMessage>>,
3131 ) -> Result<Response<Self::PushStream>, Status> {
3132 let (_tx, rx) = mpsc::channel(1);
3133 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3134 rx,
3135 )))
3136 }
3137
3138 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3139
3140 async fn pull(
3141 &self,
3142 request: tonic::Request<tonic::Streaming<PullMessage>>,
3143 ) -> Result<Response<Self::PullStream>, Status> {
3144 let state = self.state;
3145 let state_visibility_blob = self.state_visibility_blob.clone();
3146 let (tx, rx) = mpsc::channel(4);
3147
3148 tokio::spawn(async move {
3149 let mut inbound = request.into_inner();
3150 match inbound.message().await {
3151 Ok(Some(PullMessage {
3152 body: Some(pull_message::Body::Request(_)),
3153 })) => {}
3154 other => {
3155 let _ = tx
3156 .send(Err(Status::invalid_argument(format!(
3157 "expected pull request, got {other:?}"
3158 ))))
3159 .await;
3160 return;
3161 }
3162 }
3163
3164 let descriptor = object_descriptor_with_status(
3165 &state_visibility_info(state),
3166 ObjectAvailabilityStatus::Missing,
3167 "missing state visibility",
3168 );
3169 let ready = PullMessage {
3170 body: Some(pull_message::Body::Ready(PullReady {
3171 remote_state: state.to_string_full(),
3172 objects_to_fetch: vec![descriptor],
3173 transfer: None,
3174 partial_fetch_status: PartialFetchStatus::Disabled as i32,
3175 missing_objects: Vec::new(),
3176 full_closure_available: false,
3177 object_count: 1,
3178 remote_revision_address: native_revision_address(state),
3179 })),
3180 };
3181 if tx.send(Ok(ready)).await.is_err() {
3182 return;
3183 }
3184
3185 match inbound.message().await {
3186 Ok(Some(PullMessage {
3187 body: Some(pull_message::Body::Want(want)),
3188 })) if !want.want_full_closure
3189 && want.objects.len() == 1
3190 && want.objects[0].object_type == "state_visibility" => {}
3191 other => {
3192 let _ = tx
3193 .send(Err(Status::invalid_argument(format!(
3194 "expected sidecar-only want, got {other:?}"
3195 ))))
3196 .await;
3197 return;
3198 }
3199 }
3200
3201 let transfer = PullMessage {
3202 body: Some(pull_message::Body::StateVisibility(
3203 StateVisibilityTransfer {
3204 state_id: state.to_string_full(),
3205 state_visibility_blob: state_visibility_blob.into(),
3206 },
3207 )),
3208 };
3209 if tx.send(Ok(transfer)).await.is_err() {
3210 return;
3211 }
3212
3213 let complete = PullMessage {
3214 body: Some(pull_message::Body::Complete(GrpcPullComplete {
3215 success: true,
3216 new_state: state.to_string_full(),
3217 error: String::new(),
3218 transfer: Some(TransferCheckpoint {
3219 transfer_id: "sidecar-only-test".to_string(),
3220 transport_mode: TransportMode::NativePack as i32,
3221 resume_offset: 0,
3222 chunk_index: 0,
3223 checkpoint: b"heddle-markers-v1\n".to_vec(),
3224 is_complete: true,
3225 }),
3226 new_revision_address: native_revision_address(state),
3227 })),
3228 };
3229 let _ = tx.send(Ok(complete)).await;
3230 });
3231
3232 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3233 rx,
3234 )))
3235 }
3236 }
3237
3238 async fn connect_sidecar_only_service(
3239 service: SidecarOnlyPullService,
3240 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3241 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3242 Ok(listener) => listener,
3243 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3244 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3245 return None;
3246 }
3247 Err(err) => panic!("bind test server: {err}"),
3248 };
3249 let addr = listener.local_addr().expect("local addr");
3250 let incoming = futures::stream::unfold(listener, |listener| async {
3251 match listener.accept().await {
3252 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3253 Err(err) => Some((Err(err), listener)),
3254 }
3255 });
3256
3257 let handle = tokio::spawn(async move {
3258 Server::builder()
3259 .add_service(RepoSyncServiceServer::new(service))
3260 .serve_with_incoming(incoming)
3261 .await
3262 .expect("serve sidecar-only test service");
3263 });
3264
3265 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3266 .await
3267 .expect("connect client");
3268 Some((client, handle))
3269 }
3270
3271 #[tokio::test]
3272 async fn state_visibility_sidecar_only_pull_completes_without_native_pack() {
3273 let (_dir, repo) = temp_repo();
3274 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3275 let state = State::new_snapshot(
3276 tree_hash,
3277 vec![],
3278 Attribution::human(Principal {
3279 name: "Grace Hopper".into(),
3280 email: "grace@example.com".into(),
3281 }),
3282 );
3283 let state_id = state.change_id;
3284 repo.store().put_state(&state).expect("put state");
3285 assert!(
3286 repo.get_state_visibility_bytes_for_state(&state_id)
3287 .expect("load local sidecar")
3288 .is_none(),
3289 "test starts with state present and StateVisibility sidecar absent"
3290 );
3291
3292 let state_visibility_blob =
3293 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3294 .encode()
3295 .expect("encode state visibility blob");
3296 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3297 state: state_id,
3298 state_visibility_blob,
3299 })
3300 .await
3301 else {
3302 return;
3303 };
3304
3305 let exchange = tokio::time::timeout(
3306 Duration::from_secs(5),
3307 client.pull_exchange(
3308 &repo,
3309 "owner/repo",
3310 "main",
3311 PullOptions {
3312 local_thread: None,
3313 depth: None,
3314 target_state: Some(state_id),
3315 materialization: PullMaterialization::Full,
3316 },
3317 ),
3318 )
3319 .await
3320 .expect("sidecar-only pull must not hang waiting for native pack")
3321 .expect("sidecar-only pull succeeds");
3322 server.abort();
3323
3324 assert!(exchange.result.success);
3325 assert_eq!(exchange.object_count, 0);
3326 assert_eq!(exchange.profile.pack_bytes_received, 0);
3327 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3328 assert!(
3329 repo.get_state_visibility_for_state(&state_id)
3330 .expect("load accepted sidecar")
3331 .has_record(),
3332 "pull must accept the out-of-pack StateVisibility sidecar"
3333 );
3334 }
3335
3336 #[tokio::test]
3340 async fn legitimate_large_sidecar_blob_decodes_and_installs() {
3341 let (_dir, repo) = temp_repo();
3342 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3343 let state = State::new_snapshot(
3344 tree_hash,
3345 vec![],
3346 Attribution::human(Principal {
3347 name: "Grace Hopper".into(),
3348 email: "grace@example.com".into(),
3349 }),
3350 );
3351 let state_id = state.change_id;
3352 repo.store().put_state(&state).expect("put state");
3353
3354 let mut record = sample_state_visibility(state_id);
3357 record.tier = VisibilityTier::Restricted {
3358 scope_label: "x".repeat(8 * 1024 * 1024),
3359 };
3360 let state_visibility_blob = StateVisibilityBlob::new(vec![record])
3361 .encode()
3362 .expect("encode large state visibility blob");
3363 assert!(
3364 state_visibility_blob.len() > 4 * 1024 * 1024,
3365 "blob must exceed tonic's 4 MiB default to exercise the raised decode limit"
3366 );
3367 assert!(
3368 (state_visibility_blob.len() as u64) <= wire::MAX_RECEIVED_STATE_VISIBILITY_BLOB_SIZE,
3369 "blob must stay within the legitimate sidecar receive cap"
3370 );
3371
3372 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3373 state: state_id,
3374 state_visibility_blob,
3375 })
3376 .await
3377 else {
3378 return;
3379 };
3380
3381 let exchange = tokio::time::timeout(
3382 Duration::from_secs(30),
3383 client.pull_exchange(
3384 &repo,
3385 "owner/repo",
3386 "main",
3387 PullOptions {
3388 local_thread: None,
3389 depth: None,
3390 target_state: Some(state_id),
3391 materialization: PullMaterialization::Full,
3392 },
3393 ),
3394 )
3395 .await
3396 .expect("large-sidecar pull must not hang")
3397 .expect("large but legitimate sidecar pull succeeds");
3398 server.abort();
3399
3400 assert!(exchange.result.success);
3401 assert!(
3402 repo.get_state_visibility_for_state(&state_id)
3403 .expect("load accepted sidecar")
3404 .has_record(),
3405 "pull must accept a legitimately-large StateVisibility sidecar"
3406 );
3407 }
3408
3409 #[tokio::test]
3413 async fn oversized_sidecar_blob_rejected_at_grpc_decode_boundary() {
3414 let (_dir, repo) = temp_repo();
3415 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3416 let state = State::new_snapshot(
3417 tree_hash,
3418 vec![],
3419 Attribution::human(Principal {
3420 name: "Grace Hopper".into(),
3421 email: "grace@example.com".into(),
3422 }),
3423 );
3424 let state_id = state.change_id;
3425 repo.store().put_state(&state).expect("put state");
3426
3427 let oversized = vec![0u8; wire::MAX_PULL_DECODE_MESSAGE_SIZE + 1];
3430 let Some((mut client, server)) = connect_sidecar_only_service(SidecarOnlyPullService {
3431 state: state_id,
3432 state_visibility_blob: oversized,
3433 })
3434 .await
3435 else {
3436 return;
3437 };
3438
3439 let result = tokio::time::timeout(
3440 Duration::from_secs(30),
3441 client.pull_exchange(
3442 &repo,
3443 "owner/repo",
3444 "main",
3445 PullOptions {
3446 local_thread: None,
3447 depth: None,
3448 target_state: Some(state_id),
3449 materialization: PullMaterialization::Full,
3450 },
3451 ),
3452 )
3453 .await
3454 .expect("oversized-sidecar pull must not hang");
3455 server.abort();
3456
3457 let err = match result {
3458 Err(err) => err,
3459 Ok(_) => panic!("oversized sidecar PullMessage must be rejected at decode"),
3460 };
3461 let message = err.to_string();
3462 assert!(
3463 !message.contains("exceeds receive size limit"),
3464 "rejection must come from the decode-size limit, before the post-decode check: {message}"
3465 );
3466 assert!(
3467 repo.get_state_visibility_for_state(&state_id)
3468 .expect("load sidecar")
3469 .latest()
3470 .expect("resolve visibility")
3471 .is_none(),
3472 "an oversized sidecar must never be installed"
3473 );
3474 }
3475
3476 #[derive(Clone)]
3477 struct StateAndVisibilityPullService {
3478 state: ChangeId,
3479 pack_bundle: wire::NativePackBundle,
3480 state_visibility_blob: Vec<u8>,
3481 }
3482
3483 #[tonic::async_trait]
3484 impl RepoSyncService for StateAndVisibilityPullService {
3485 async fn list_refs(
3486 &self,
3487 _request: tonic::Request<ListRefsRequest>,
3488 ) -> Result<Response<ListRefsResponse>, Status> {
3489 Ok(Response::new(ListRefsResponse::default()))
3490 }
3491
3492 async fn update_ref(
3493 &self,
3494 _request: tonic::Request<UpdateRefRequest>,
3495 ) -> Result<Response<UpdateRefResponse>, Status> {
3496 Ok(Response::new(UpdateRefResponse::default()))
3497 }
3498
3499 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
3500
3501 async fn push(
3502 &self,
3503 _request: tonic::Request<tonic::Streaming<PushMessage>>,
3504 ) -> Result<Response<Self::PushStream>, Status> {
3505 let (_tx, rx) = mpsc::channel(1);
3506 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3507 rx,
3508 )))
3509 }
3510
3511 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
3512
3513 async fn pull(
3514 &self,
3515 request: tonic::Request<tonic::Streaming<PullMessage>>,
3516 ) -> Result<Response<Self::PullStream>, Status> {
3517 let state = self.state;
3518 let pack_bundle = self.pack_bundle.clone();
3519 let state_visibility_blob = self.state_visibility_blob.clone();
3520 let (tx, rx) = mpsc::channel(8);
3521
3522 tokio::spawn(async move {
3523 let mut inbound = request.into_inner();
3524 match inbound.message().await {
3525 Ok(Some(PullMessage {
3526 body: Some(pull_message::Body::Request(_)),
3527 })) => {}
3528 other => {
3529 let _ = tx
3530 .send(Err(Status::invalid_argument(format!(
3531 "expected pull request, got {other:?}"
3532 ))))
3533 .await;
3534 return;
3535 }
3536 }
3537
3538 let ready = PullMessage {
3539 body: Some(pull_message::Body::Ready(PullReady {
3540 remote_state: state.to_string_full(),
3541 objects_to_fetch: vec![
3542 object_descriptor_with_status(
3543 &state_info(state),
3544 ObjectAvailabilityStatus::Missing,
3545 "missing state",
3546 ),
3547 object_descriptor_with_status(
3548 &state_visibility_info(state),
3549 ObjectAvailabilityStatus::Missing,
3550 "missing state visibility",
3551 ),
3552 ],
3553 transfer: None,
3554 partial_fetch_status: PartialFetchStatus::Disabled as i32,
3555 missing_objects: Vec::new(),
3556 full_closure_available: false,
3557 object_count: 2,
3558 remote_revision_address: native_revision_address(state),
3559 })),
3560 };
3561 if tx.send(Ok(ready)).await.is_err() {
3562 return;
3563 }
3564
3565 match inbound.message().await {
3566 Ok(Some(PullMessage {
3567 body: Some(pull_message::Body::Want(want)),
3568 })) if !want.want_full_closure
3569 && want.objects.len() == 2
3570 && want
3571 .objects
3572 .iter()
3573 .any(|object| object.object_type == "state")
3574 && want
3575 .objects
3576 .iter()
3577 .any(|object| object.object_type == "state_visibility") => {}
3578 other => {
3579 let _ = tx
3580 .send(Err(Status::invalid_argument(format!(
3581 "expected state + sidecar wants, got {other:?}"
3582 ))))
3583 .await;
3584 return;
3585 }
3586 }
3587
3588 for message in
3589 encode_pull_native_pack_messages(&pack_bundle, "state-and-visibility-test", 16)
3590 {
3591 if tx.send(Ok(message)).await.is_err() {
3592 return;
3593 }
3594 }
3595
3596 let transfer = PullMessage {
3597 body: Some(pull_message::Body::StateVisibility(
3598 StateVisibilityTransfer {
3599 state_id: state.to_string_full(),
3600 state_visibility_blob: state_visibility_blob.into(),
3601 },
3602 )),
3603 };
3604 if tx.send(Ok(transfer)).await.is_err() {
3605 return;
3606 }
3607
3608 let complete = PullMessage {
3609 body: Some(pull_message::Body::Complete(GrpcPullComplete {
3610 success: true,
3611 new_state: state.to_string_full(),
3612 error: String::new(),
3613 transfer: Some(TransferCheckpoint {
3614 transfer_id: "state-and-visibility-test".to_string(),
3615 transport_mode: TransportMode::NativePack as i32,
3616 resume_offset: 0,
3617 chunk_index: 0,
3618 checkpoint: b"heddle-markers-v1\n".to_vec(),
3619 is_complete: true,
3620 }),
3621 new_revision_address: native_revision_address(state),
3622 })),
3623 };
3624 let _ = tx.send(Ok(complete)).await;
3625 });
3626
3627 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
3628 rx,
3629 )))
3630 }
3631 }
3632
3633 fn encode_pull_native_pack_messages(
3634 bundle: &wire::NativePackBundle,
3635 transfer_id: &str,
3636 chunk_size: usize,
3637 ) -> Vec<PullMessage> {
3638 let mut messages = Vec::new();
3639 let chunk_size = chunk_size.max(1);
3640
3641 let pack_total_chunks = wire::chunk_count(bundle.pack_data.len(), chunk_size);
3642 for chunk_index in 0..pack_total_chunks.max(1) {
3643 let Some((start, len)) =
3644 wire::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
3645 else {
3646 break;
3647 };
3648 messages.push(PullMessage {
3649 body: Some(pull_message::Body::Pack(PackChunk {
3650 stream_kind: PackStreamKind::Pack as i32,
3651 data: bundle.pack_data[start..start + len].to_vec().into(),
3652 transfer: Some(TransferCheckpoint {
3653 transfer_id: transfer_id.to_string(),
3654 transport_mode: TransportMode::NativePack as i32,
3655 resume_offset: start as u64,
3656 chunk_index: chunk_index as u32,
3657 checkpoint: Vec::new(),
3658 is_complete: chunk_index + 1 == pack_total_chunks,
3659 }),
3660 chunk_length: len as u32,
3661 is_final_chunk: chunk_index + 1 == pack_total_chunks,
3662 })),
3663 });
3664 }
3665
3666 let index_total_chunks = wire::chunk_count(bundle.index_data.len(), chunk_size);
3667 for chunk_index in 0..index_total_chunks.max(1) {
3668 let Some((start, len)) =
3669 wire::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
3670 else {
3671 break;
3672 };
3673 messages.push(PullMessage {
3674 body: Some(pull_message::Body::Pack(PackChunk {
3675 stream_kind: PackStreamKind::Index as i32,
3676 data: bundle.index_data[start..start + len].to_vec().into(),
3677 transfer: Some(TransferCheckpoint {
3678 transfer_id: transfer_id.to_string(),
3679 transport_mode: TransportMode::NativePack as i32,
3680 resume_offset: start as u64,
3681 chunk_index: chunk_index as u32,
3682 checkpoint: Vec::new(),
3683 is_complete: chunk_index + 1 == index_total_chunks,
3684 }),
3685 chunk_length: len as u32,
3686 is_final_chunk: chunk_index + 1 == index_total_chunks,
3687 })),
3688 });
3689 }
3690
3691 messages
3692 }
3693
3694 async fn connect_state_and_visibility_service(
3695 service: StateAndVisibilityPullService,
3696 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
3697 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
3698 Ok(listener) => listener,
3699 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
3700 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
3701 return None;
3702 }
3703 Err(err) => panic!("bind test server: {err}"),
3704 };
3705 let addr = listener.local_addr().expect("local addr");
3706 let incoming = futures::stream::unfold(listener, |listener| async {
3707 match listener.accept().await {
3708 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
3709 Err(err) => Some((Err(err), listener)),
3710 }
3711 });
3712
3713 let handle = tokio::spawn(async move {
3714 Server::builder()
3715 .add_service(RepoSyncServiceServer::new(service))
3716 .serve_with_incoming(incoming)
3717 .await
3718 .expect("serve state-and-visibility test service");
3719 });
3720
3721 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
3722 .await
3723 .expect("connect client");
3724 Some((client, handle))
3725 }
3726
3727 #[tokio::test]
3728 async fn state_and_visibility_same_change_id_pull_requests_pack_and_sidecar() {
3729 let (_source_dir, source_repo) = temp_repo();
3730 let (_target_dir, target_repo) = temp_repo();
3731 let tree_hash = source_repo
3732 .store()
3733 .put_tree(&Tree::new())
3734 .expect("put tree");
3735 let state = State::new_snapshot(
3736 tree_hash,
3737 vec![],
3738 Attribution::human(Principal {
3739 name: "Grace Hopper".into(),
3740 email: "grace@example.com".into(),
3741 }),
3742 );
3743 let state_id = state.change_id;
3744 source_repo
3745 .store()
3746 .put_state(&state)
3747 .expect("put source state");
3748 let state_visibility_blob =
3749 StateVisibilityBlob::new(vec![sample_state_visibility(state_id)])
3750 .encode()
3751 .expect("encode state visibility blob");
3752 source_repo
3753 .accept_wire_state_visibility(state_id, &state_visibility_blob)
3754 .expect("put source state visibility");
3755 let pack_bundle = wire::build_native_pack(source_repo.store(), &[state_info(state_id)])
3756 .expect("build state pack");
3757
3758 assert!(
3759 target_repo
3760 .store()
3761 .get_state(&state_id)
3762 .expect("load target state")
3763 .is_none(),
3764 "test starts with state absent"
3765 );
3766 assert!(
3767 target_repo
3768 .get_state_visibility_bytes_for_state(&state_id)
3769 .expect("load target sidecar")
3770 .is_none(),
3771 "test starts with StateVisibility sidecar absent"
3772 );
3773
3774 let Some((mut client, server)) =
3775 connect_state_and_visibility_service(StateAndVisibilityPullService {
3776 state: state_id,
3777 pack_bundle,
3778 state_visibility_blob,
3779 })
3780 .await
3781 else {
3782 return;
3783 };
3784
3785 let exchange = tokio::time::timeout(
3786 Duration::from_secs(5),
3787 client.pull_exchange(
3788 &target_repo,
3789 "owner/repo",
3790 "main",
3791 PullOptions {
3792 local_thread: None,
3793 depth: None,
3794 target_state: Some(state_id),
3795 materialization: PullMaterialization::Full,
3796 },
3797 ),
3798 )
3799 .await
3800 .expect("state + sidecar pull must not hang waiting for native pack")
3801 .expect("state + sidecar pull succeeds");
3802 server.abort();
3803
3804 assert!(exchange.result.success);
3805 assert_eq!(exchange.object_count, 1);
3806 assert!(exchange.profile.pack_bytes_received > 0);
3807 assert_eq!(exchange.profile.object_mix.states, 1);
3808 assert_eq!(exchange.profile.object_mix.state_visibilities, 1);
3809 assert!(
3810 target_repo
3811 .store()
3812 .get_state(&state_id)
3813 .expect("load installed state")
3814 .is_some(),
3815 "native pack must install the State"
3816 );
3817 assert!(
3818 target_repo
3819 .get_state_visibility_for_state(&state_id)
3820 .expect("load accepted sidecar")
3821 .has_record(),
3822 "pull must accept the out-of-pack StateVisibility sidecar"
3823 );
3824 }
3825
3826 #[test]
3827 fn collect_missing_blobs_treats_absent_tree_as_empty() {
3828 let (_dir, repo) = temp_repo();
3829 let absent_tree = ContentHash::from_bytes([99u8; 32]);
3830
3831 let missing =
3832 collect_missing_blobs(&repo, &absent_tree).expect("absent tree is not an error");
3833
3834 assert!(missing.is_empty());
3835 }
3836
3837 #[test]
3838 fn collect_missing_blobs_reports_only_genuinely_missing_blobs() {
3839 let (_dir, repo) = temp_repo();
3840 let present_blob = Blob::from("already local");
3841 let present_hash = repo.store().put_blob(&present_blob).expect("put blob");
3842 let missing_hash = ContentHash::from_bytes([42u8; 32]);
3843 let tree = Tree::from_entries(vec![
3844 TreeEntry::file("local.txt", present_hash, false).expect("present entry"),
3845 TreeEntry::file("remote.txt", missing_hash, false).expect("missing entry"),
3846 ]);
3847 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
3848
3849 let missing = collect_missing_blobs(&repo, &tree_hash).expect("collect missing blobs");
3850
3851 assert_eq!(missing, vec![missing_hash]);
3852 }
3853
3854 #[test]
3855 fn collect_missing_blobs_reports_corrupt_tree_read() {
3856 let (_dir, repo) = temp_repo();
3857 let tree_hash = repo.store().put_tree(&Tree::new()).expect("put tree");
3858 std::fs::write(loose_tree_path(&repo, &tree_hash), [0xc1]).expect("corrupt tree");
3859 repo.store().clear_recent_caches();
3860
3861 let err = collect_missing_blobs(&repo, &tree_hash).expect_err("corrupt tree must fail");
3862
3863 assert!(matches!(err, ProtocolError::InvalidState(_)));
3864 assert!(
3865 err.to_string().contains(&format!(
3866 "load tree {} while collecting lazy hydration missing blobs",
3867 tree_hash.to_hex()
3868 )),
3869 "unexpected error: {err}"
3870 );
3871 }
3872
3873 #[test]
3874 fn redaction_push_message_uses_hex_keyed_sidecar_payload() {
3875 let (_dir, repo) = temp_repo();
3876 let blob = sample_blob();
3877 repo.put_redaction(sample_redaction(blob))
3878 .expect("put redaction");
3879 let expected_bytes = repo
3880 .store()
3881 .get_redactions_bytes_for_blob(&blob)
3882 .expect("load sidecar")
3883 .expect("sidecar exists");
3884
3885 let message = redaction_push_message(&repo, redaction_info(blob)).expect("message");
3886
3887 let Some(push_message::Body::Redaction(transfer)) = message.body else {
3888 panic!("expected redaction transfer");
3889 };
3890 assert_eq!(transfer.blob_hash, blob.to_hex());
3891 assert_eq!(transfer.redactions_blob, expected_bytes);
3892 }
3893
3894 #[test]
3895 fn redaction_push_message_reports_missing_sidecar_with_blob_hex() {
3896 let (_dir, repo) = temp_repo();
3897 let blob = sample_blob();
3898
3899 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("missing sidecar");
3900
3901 assert!(matches!(err, ProtocolError::InvalidState(_)));
3902 assert!(
3903 err.to_string().contains(&format!(
3904 "server wants redaction for blob {} but sender has no sidecar",
3905 blob.to_hex()
3906 )),
3907 "unexpected error: {err}"
3908 );
3909 }
3910
3911 #[test]
3912 fn redaction_push_message_reports_sidecar_load_error_with_blob_hex() {
3913 let (_dir, repo) = temp_repo();
3914 let blob = sample_blob();
3915 let redaction_path = repo
3916 .heddle_dir()
3917 .join("redactions")
3918 .join(format!("{}.bin", blob.to_hex()));
3919 std::fs::create_dir_all(&redaction_path).expect("directory at redaction path");
3920
3921 let err = redaction_push_message(&repo, redaction_info(blob)).expect_err("load error");
3922
3923 assert!(matches!(err, ProtocolError::InvalidState(_)));
3924 assert!(
3925 err.to_string()
3926 .contains(&format!("load redactions sidecar for {}:", blob.to_hex())),
3927 "unexpected error: {err}"
3928 );
3929 }
3930
3931 #[test]
3932 fn state_visibility_push_message_uses_state_keyed_sidecar_payload() {
3933 let (_dir, repo) = temp_repo();
3934 let state = ChangeId::from_bytes([17u8; 16]);
3935 repo.put_state_visibility(sample_state_visibility(state))
3936 .expect("put state visibility");
3937 let expected_bytes = repo
3938 .get_state_visibility_bytes_for_state(&state)
3939 .expect("load sidecar")
3940 .expect("sidecar exists");
3941
3942 let message =
3943 state_visibility_push_message(&repo, state_visibility_info(state)).expect("message");
3944
3945 let Some(push_message::Body::StateVisibility(transfer)) = message.body else {
3946 panic!("expected state visibility transfer");
3947 };
3948 assert_eq!(transfer.state_id, state.to_string_full());
3949 assert_eq!(transfer.state_visibility_blob, expected_bytes);
3950 }
3951
3952 fn sample_attribution() -> Attribution {
3955 Attribution::human(Principal {
3956 name: "Grace Hopper".into(),
3957 email: "grace@example.com".into(),
3958 })
3959 }
3960
3961 fn put_state_with_blob(
3964 repo: &Repository,
3965 contents: &str,
3966 parents: Vec<ChangeId>,
3967 ) -> ChangeId {
3968 let blob = Blob::from(contents);
3969 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
3970 let tree = Tree::from_entries(vec![
3971 TreeEntry::file(format!("{contents}.txt"), blob_hash, false).expect("tree entry"),
3972 ]);
3973 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
3974 let state = State::new_snapshot(tree_hash, parents, sample_attribution());
3975 let state_id = state.change_id;
3976 repo.store().put_state(&state).expect("put state");
3977 state_id
3978 }
3979
3980 #[test]
3981 fn locally_complete_pull_head_advertises_fully_present_thread_head() {
3982 let (_dir, repo) = temp_repo();
3983 let head = put_state_with_blob(&repo, "alpha", vec![]);
3984 repo.refs()
3985 .set_thread(&ThreadName::from("main"), &head)
3986 .expect("set thread");
3987
3988 let advertised =
3989 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
3990 assert_eq!(
3991 advertised,
3992 Some(head),
3993 "a thread head whose full closure is present locally must be advertised"
3994 );
3995 }
3996
3997 #[test]
3998 fn locally_complete_pull_head_skips_unknown_thread() {
3999 let (_dir, repo) = temp_repo_unseeded();
4003 let advertised =
4004 locally_complete_pull_head(&repo, "nonexistent", None).expect("completeness check");
4005 assert_eq!(advertised, None);
4006 }
4007
4008 #[test]
4009 fn locally_complete_pull_head_skips_when_target_state_override_in_play() {
4010 let (_dir, repo) = temp_repo();
4011 let head = put_state_with_blob(&repo, "alpha", vec![]);
4012 repo.refs()
4013 .set_thread(&ThreadName::from("main"), &head)
4014 .expect("set thread");
4015
4016 let advertised = locally_complete_pull_head(&repo, "main", Some(head))
4019 .expect("completeness check");
4020 assert_eq!(advertised, None);
4021 }
4022
4023 #[test]
4024 fn locally_complete_pull_head_skips_repo_with_missing_blobs() {
4025 let (_dir, repo) = temp_repo();
4030 let head = put_state_with_blob(&repo, "alpha", vec![]);
4031 repo.refs()
4032 .set_thread(&ThreadName::from("main"), &head)
4033 .expect("set thread");
4034 repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4035 .expect("record missing blob");
4036
4037 let advertised =
4038 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4039 assert_eq!(
4040 advertised, None,
4041 "a repo carrying missing blobs must never advertise a head"
4042 );
4043 }
4044
4045 #[test]
4046 fn locally_complete_pull_head_skips_head_with_incomplete_closure() {
4047 let (_dir, repo) = temp_repo();
4052 let absent_parent = ChangeId::generate();
4053 let blob = Blob::from("orphan");
4054 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4055 let tree = Tree::from_entries(vec![
4056 TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4057 ]);
4058 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4059 let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4061 let child_id = child.change_id;
4062 repo.store().put_state(&child).expect("put child state");
4063 repo.refs()
4064 .set_thread(&ThreadName::from("main"), &child_id)
4065 .expect("set thread");
4066
4067 let advertised =
4068 locally_complete_pull_head(&repo, "main", None).expect("completeness check");
4069 assert_eq!(
4070 advertised, None,
4071 "a head whose closure has an absent parent state must not be advertised"
4072 );
4073 }
4074
4075 #[test]
4076 fn locally_complete_local_thread_head_advertises_fully_present_thread_head() {
4077 let (_dir, repo) = temp_repo();
4080 let head = put_state_with_blob(&repo, "alpha", vec![]);
4081 repo.refs()
4082 .set_thread(&ThreadName::from("feature"), &head)
4083 .expect("set thread");
4084
4085 let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4086 .expect("completeness check");
4087 assert_eq!(
4088 advertised,
4089 Some(head),
4090 "an explicit local-thread head whose full closure is present must be advertised"
4091 );
4092 }
4093
4094 #[test]
4095 fn locally_complete_local_thread_head_skips_unknown_thread() {
4096 let (_dir, repo) = temp_repo_unseeded();
4099 let advertised = locally_complete_local_thread_head(&repo, "nonexistent", None)
4100 .expect("completeness check");
4101 assert_eq!(advertised, None);
4102 }
4103
4104 #[test]
4105 fn locally_complete_local_thread_head_skips_when_target_state_override_in_play() {
4106 let (_dir, repo) = temp_repo();
4107 let head = put_state_with_blob(&repo, "alpha", vec![]);
4108 repo.refs()
4109 .set_thread(&ThreadName::from("feature"), &head)
4110 .expect("set thread");
4111
4112 let advertised = locally_complete_local_thread_head(&repo, "feature", Some(head))
4115 .expect("completeness check");
4116 assert_eq!(advertised, None);
4117 }
4118
4119 #[test]
4120 fn locally_complete_local_thread_head_skips_repo_with_missing_blobs() {
4121 let (_dir, repo) = temp_repo();
4124 let head = put_state_with_blob(&repo, "alpha", vec![]);
4125 repo.refs()
4126 .set_thread(&ThreadName::from("feature"), &head)
4127 .expect("set thread");
4128 repo.record_missing_blob(ContentHash::from_bytes([88u8; 32]))
4129 .expect("record missing blob");
4130
4131 let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4132 .expect("completeness check");
4133 assert_eq!(
4134 advertised, None,
4135 "a repo carrying missing blobs must never advertise an explicit local-thread head"
4136 );
4137 }
4138
4139 #[test]
4140 fn locally_complete_local_thread_head_skips_head_with_incomplete_closure() {
4141 let (_dir, repo) = temp_repo();
4146 let absent_parent = ChangeId::generate();
4147 let blob = Blob::from("orphan");
4148 let blob_hash = repo.store().put_blob(&blob).expect("put blob");
4149 let tree = Tree::from_entries(vec![
4150 TreeEntry::file("orphan.txt", blob_hash, false).expect("tree entry"),
4151 ]);
4152 let tree_hash = repo.store().put_tree(&tree).expect("put tree");
4153 let child = State::new_snapshot(tree_hash, vec![absent_parent], sample_attribution());
4154 let child_id = child.change_id;
4155 repo.store().put_state(&child).expect("put child state");
4156 repo.refs()
4157 .set_thread(&ThreadName::from("feature"), &child_id)
4158 .expect("set thread");
4159
4160 let advertised = locally_complete_local_thread_head(&repo, "feature", None)
4161 .expect("completeness check");
4162 assert_eq!(
4163 advertised, None,
4164 "an explicit local-thread head whose closure has an absent parent must not be advertised"
4165 );
4166 }
4167
4168 #[derive(Clone)]
4174 struct DeltaAwarePullService {
4175 remote_state: ChangeId,
4176 full_closure: Vec<ObjectInfo>,
4182 delta_objects: Vec<ObjectInfo>,
4183 known_parent: ChangeId,
4184 full_pack: wire::NativePackBundle,
4185 delta_pack: wire::NativePackBundle,
4186 captured_exclude: std::sync::Arc<std::sync::Mutex<Option<Vec<String>>>>,
4187 }
4188
4189 #[tonic::async_trait]
4190 impl RepoSyncService for DeltaAwarePullService {
4191 async fn list_refs(
4192 &self,
4193 _request: tonic::Request<ListRefsRequest>,
4194 ) -> Result<Response<ListRefsResponse>, Status> {
4195 Ok(Response::new(ListRefsResponse::default()))
4196 }
4197
4198 async fn update_ref(
4199 &self,
4200 _request: tonic::Request<UpdateRefRequest>,
4201 ) -> Result<Response<UpdateRefResponse>, Status> {
4202 Ok(Response::new(UpdateRefResponse::default()))
4203 }
4204
4205 type PushStream = tokio_stream::wrappers::ReceiverStream<Result<PushMessage, Status>>;
4206
4207 async fn push(
4208 &self,
4209 _request: tonic::Request<tonic::Streaming<PushMessage>>,
4210 ) -> Result<Response<Self::PushStream>, Status> {
4211 let (_tx, rx) = mpsc::channel(1);
4212 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4213 rx,
4214 )))
4215 }
4216
4217 type PullStream = tokio_stream::wrappers::ReceiverStream<Result<PullMessage, Status>>;
4218
4219 async fn pull(
4220 &self,
4221 request: tonic::Request<tonic::Streaming<PullMessage>>,
4222 ) -> Result<Response<Self::PullStream>, Status> {
4223 let svc = self.clone();
4224 let (tx, rx) = mpsc::channel(16);
4225
4226 tokio::spawn(async move {
4227 let mut inbound = request.into_inner();
4228 let exclude = match inbound.message().await {
4229 Ok(Some(PullMessage {
4230 body: Some(pull_message::Body::Request(req)),
4231 })) => req.exclude_states,
4232 other => {
4233 let _ = tx
4234 .send(Err(Status::invalid_argument(format!(
4235 "expected pull request, got {other:?}"
4236 ))))
4237 .await;
4238 return;
4239 }
4240 };
4241 *svc.captured_exclude.lock().unwrap() = Some(exclude.clone());
4242
4243 let remote_full = svc.remote_state.to_string_full();
4244 let parent_full = svc.known_parent.to_string_full();
4245
4246 let (objects, pack, short_circuit) = if exclude.contains(&remote_full) {
4248 (Vec::new(), None, true)
4250 } else if exclude.contains(&parent_full) {
4251 (svc.delta_objects.clone(), Some(svc.delta_pack.clone()), false)
4253 } else {
4254 (svc.full_closure.clone(), Some(svc.full_pack.clone()), false)
4256 };
4257
4258 let descriptors: Vec<_> = objects
4259 .iter()
4260 .map(|info| {
4261 object_descriptor_with_status(
4262 info,
4263 ObjectAvailabilityStatus::Missing,
4264 "requested by client",
4265 )
4266 })
4267 .collect();
4268
4269 let ready = PullMessage {
4270 body: Some(pull_message::Body::Ready(PullReady {
4271 remote_state: remote_full.clone(),
4272 objects_to_fetch: descriptors,
4273 transfer: None,
4274 partial_fetch_status: PartialFetchStatus::Disabled as i32,
4275 missing_objects: Vec::new(),
4276 full_closure_available: false,
4277 object_count: objects.len() as u32,
4278 remote_revision_address: native_revision_address(svc.remote_state),
4279 })),
4280 };
4281 if tx.send(Ok(ready)).await.is_err() {
4282 return;
4283 }
4284
4285 match inbound.message().await {
4287 Ok(Some(PullMessage {
4288 body: Some(pull_message::Body::Want(_)),
4289 })) => {}
4290 other => {
4291 let _ = tx
4292 .send(Err(Status::invalid_argument(format!(
4293 "expected want, got {other:?}"
4294 ))))
4295 .await;
4296 return;
4297 }
4298 }
4299
4300 if let Some(pack) = pack
4301 && !short_circuit
4302 {
4303 for message in encode_pull_native_pack_messages(&pack, "delta-aware-test", 64) {
4304 if tx.send(Ok(message)).await.is_err() {
4305 return;
4306 }
4307 }
4308 }
4309
4310 let complete = PullMessage {
4311 body: Some(pull_message::Body::Complete(GrpcPullComplete {
4312 success: true,
4313 new_state: remote_full,
4314 error: String::new(),
4315 transfer: Some(TransferCheckpoint {
4316 transfer_id: "delta-aware-test".to_string(),
4317 transport_mode: TransportMode::NativePack as i32,
4318 resume_offset: 0,
4319 chunk_index: 0,
4320 checkpoint: b"heddle-markers-v1\n".to_vec(),
4321 is_complete: true,
4322 }),
4323 new_revision_address: native_revision_address(svc.remote_state),
4324 })),
4325 };
4326 let _ = tx.send(Ok(complete)).await;
4327 });
4328
4329 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(
4330 rx,
4331 )))
4332 }
4333 }
4334
4335 async fn connect_delta_aware_service(
4336 service: DeltaAwarePullService,
4337 ) -> Option<(HostedGrpcClient, tokio::task::JoinHandle<()>)> {
4338 let listener = match tokio::net::TcpListener::bind(("127.0.0.1", 0)).await {
4339 Ok(listener) => listener,
4340 Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
4341 eprintln!("skipping hosted sync local gRPC test: TCP bind denied: {err}");
4342 return None;
4343 }
4344 Err(err) => panic!("bind test server: {err}"),
4345 };
4346 let addr = listener.local_addr().expect("local addr");
4347 let incoming = futures::stream::unfold(listener, |listener| async {
4348 match listener.accept().await {
4349 Ok((stream, _addr)) => Some((Ok::<_, std::io::Error>(stream), listener)),
4350 Err(err) => Some((Err(err), listener)),
4351 }
4352 });
4353 let handle = tokio::spawn(async move {
4354 Server::builder()
4355 .add_service(RepoSyncServiceServer::new(service))
4356 .serve_with_incoming(incoming)
4357 .await
4358 .expect("serve delta-aware test service");
4359 });
4360 let client = HostedGrpcClient::connect(addr, &ClientConfig::default())
4361 .await
4362 .expect("connect client");
4363 Some((client, handle))
4364 }
4365
4366 #[tokio::test]
4367 async fn warm_bare_pull_advertises_head_and_fires_zero_delta_short_circuit() {
4368 let (_dir, repo) = temp_repo();
4372 let head = put_state_with_blob(&repo, "tip", vec![]);
4373 repo.refs()
4374 .set_thread(&ThreadName::from("main"), &head)
4375 .expect("set thread");
4376
4377 let full_closure =
4378 wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
4379 let full_pack =
4380 wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
4381 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4382
4383 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4384 remote_state: head,
4385 full_closure,
4386 delta_objects: Vec::new(),
4387 known_parent: ChangeId::generate(),
4388 full_pack: full_pack.clone(),
4389 delta_pack: full_pack,
4390 captured_exclude: captured.clone(),
4391 })
4392 .await
4393 else {
4394 return;
4395 };
4396
4397 let exchange = tokio::time::timeout(
4398 Duration::from_secs(5),
4399 client.pull_exchange(
4400 &repo,
4401 "owner/repo",
4402 "main",
4403 PullOptions {
4404 local_thread: None,
4405 depth: None,
4406 target_state: None,
4407 materialization: PullMaterialization::Full,
4408 },
4409 ),
4410 )
4411 .await
4412 .expect("warm bare pull must not hang")
4413 .expect("warm bare pull succeeds");
4414 server.abort();
4415
4416 let advertised = captured.lock().unwrap().clone().expect("request captured");
4417 assert_eq!(
4418 advertised,
4419 vec![head.to_string_full()],
4420 "a bare pull at the tip must advertise that tip in exclude_states"
4421 );
4422 assert!(exchange.result.success);
4423 assert_eq!(
4424 exchange.object_count, 0,
4425 "the zero-delta short-circuit must transfer no objects, not the full closure"
4426 );
4427 }
4428
4429 #[tokio::test]
4430 async fn behind_client_bare_pull_receives_exactly_the_missing_delta() {
4431 let (_src_dir, src_repo) = temp_repo_unseeded();
4438 let parent = put_state_with_blob(&src_repo, "base", vec![]);
4439 let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
4440 let child_closure =
4441 wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
4442
4443 let (_dir, repo) = temp_repo_unseeded();
4447 let parent_closure =
4448 wire::enumerate_state_closure(src_repo.store(), parent).expect("parent closure");
4449 let parent_pack =
4450 wire::build_native_pack(src_repo.store(), &parent_closure).expect("parent pack");
4451 wire::install_received_pack(repo.store(), &parent_pack.pack_data, &parent_pack.index_data)
4452 .expect("install parent closure into client");
4453 repo.refs()
4454 .set_thread(&ThreadName::from("main"), &parent)
4455 .expect("set thread to parent");
4456 wire::enumerate_state_closure(repo.store(), parent).expect("client holds parent closure");
4458 assert!(
4460 repo.store().get_state(&child).expect("probe child").is_none(),
4461 "client must start without the child state"
4462 );
4463 let parent_clone = parent;
4464 let full_pack =
4465 wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
4466 let delta_objects = wire::enumerate_state_closure_with_options(
4469 src_repo.store(),
4470 child,
4471 wire::StateClosureOptions {
4472 depth: None,
4473 exclude_states: vec![parent_clone],
4474 },
4475 )
4476 .expect("delta closure");
4477 assert!(
4478 !delta_objects.is_empty() && delta_objects.len() < child_closure.len(),
4479 "delta must be a strict, non-empty subset of the full closure"
4480 );
4481 let delta_pack =
4482 wire::build_native_pack(src_repo.store(), &delta_objects).expect("delta pack");
4483
4484 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4485 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4486 remote_state: child,
4487 full_closure: child_closure.clone(),
4488 delta_objects: delta_objects.clone(),
4489 known_parent: parent_clone,
4490 full_pack,
4491 delta_pack,
4492 captured_exclude: captured.clone(),
4493 })
4494 .await
4495 else {
4496 return;
4497 };
4498
4499 let exchange = tokio::time::timeout(
4500 Duration::from_secs(5),
4501 client.pull_exchange(
4502 &repo,
4503 "owner/repo",
4504 "main",
4505 PullOptions {
4506 local_thread: None,
4507 depth: None,
4508 target_state: None,
4509 materialization: PullMaterialization::Full,
4510 },
4511 ),
4512 )
4513 .await
4514 .expect("behind-client pull must not hang")
4515 .expect("behind-client pull succeeds");
4516 server.abort();
4517
4518 let advertised = captured.lock().unwrap().clone().expect("request captured");
4519 assert_eq!(
4520 advertised,
4521 vec![parent.to_string_full()],
4522 "a behind client must advertise the parent head it holds"
4523 );
4524 assert!(exchange.result.success);
4525 let reassembled = wire::enumerate_state_closure(repo.store(), child)
4529 .expect("the client must hold the complete child closure after a delta pull");
4530 assert_eq!(
4531 reassembled.len(),
4532 child_closure.len(),
4533 "delta pull must leave the client with the full child closure, no gaps"
4534 );
4535 }
4536
4537 #[tokio::test]
4538 async fn fresh_bare_pull_advertises_nothing_and_gets_full_closure() {
4539 let (_dir, repo) = temp_repo_unseeded();
4543 let (_src_dir, src_repo) = temp_repo_unseeded();
4544 let remote = put_state_with_blob(&src_repo, "seed", vec![]);
4545 let full_closure =
4546 wire::enumerate_state_closure(src_repo.store(), remote).expect("closure");
4547 let full_pack =
4548 wire::build_native_pack(src_repo.store(), &full_closure).expect("full pack");
4549
4550 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4551 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4552 remote_state: remote,
4553 full_closure: full_closure.clone(),
4554 delta_objects: Vec::new(),
4555 known_parent: ChangeId::generate(),
4556 full_pack: full_pack.clone(),
4557 delta_pack: full_pack,
4558 captured_exclude: captured.clone(),
4559 })
4560 .await
4561 else {
4562 return;
4563 };
4564
4565 let exchange = tokio::time::timeout(
4566 Duration::from_secs(5),
4567 client.pull_exchange(
4568 &repo,
4569 "owner/repo",
4570 "main",
4571 PullOptions {
4572 local_thread: None,
4573 depth: None,
4574 target_state: None,
4575 materialization: PullMaterialization::Full,
4576 },
4577 ),
4578 )
4579 .await
4580 .expect("fresh bare pull must not hang")
4581 .expect("fresh bare pull succeeds");
4582 server.abort();
4583
4584 let advertised = captured.lock().unwrap().clone().expect("request captured");
4585 assert!(
4586 advertised.is_empty(),
4587 "a fresh repo must advertise no exclude_states"
4588 );
4589 assert!(exchange.result.success);
4590 assert_eq!(
4591 exchange.object_count,
4592 full_closure.len(),
4593 "a fresh pull must receive the full closure, nothing wrongly excluded"
4594 );
4595 assert!(
4596 wire::enumerate_state_closure(repo.store(), remote).is_ok(),
4597 "fresh pull must install the complete closure"
4598 );
4599 }
4600
4601 #[tokio::test]
4602 async fn explicit_local_thread_incomplete_closure_does_not_advertise_and_repairs() {
4603 let (_src_dir, src_repo) = temp_repo_unseeded();
4613 let parent = put_state_with_blob(&src_repo, "base", vec![]);
4614 let child = put_state_with_blob(&src_repo, "feature", vec![parent]);
4615 let child_closure =
4616 wire::enumerate_state_closure(src_repo.store(), child).expect("child closure");
4617 let full_pack =
4618 wire::build_native_pack(src_repo.store(), &child_closure).expect("full pack");
4619
4620 let (_dir, repo) = temp_repo_unseeded();
4625 let child_only = wire::enumerate_state_closure_with_options(
4626 src_repo.store(),
4627 child,
4628 wire::StateClosureOptions {
4629 depth: None,
4630 exclude_states: vec![parent],
4631 },
4632 )
4633 .expect("child-only objects");
4634 let child_only_pack =
4635 wire::build_native_pack(src_repo.store(), &child_only).expect("child-only pack");
4636 wire::install_received_pack(
4637 repo.store(),
4638 &child_only_pack.pack_data,
4639 &child_only_pack.index_data,
4640 )
4641 .expect("install child-only objects");
4642 repo.refs()
4643 .set_thread(&ThreadName::from("feature"), &child)
4644 .expect("set local thread to child");
4645 assert!(
4648 matches!(
4649 wire::enumerate_state_closure(repo.store(), child),
4650 Err(ProtocolError::ObjectNotFound(_))
4651 ),
4652 "the explicit thread head's closure must start incomplete"
4653 );
4654
4655 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4656 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4657 remote_state: child,
4658 full_closure: child_closure.clone(),
4659 delta_objects: Vec::new(),
4660 known_parent: parent,
4661 full_pack: full_pack.clone(),
4662 delta_pack: full_pack,
4663 captured_exclude: captured.clone(),
4664 })
4665 .await
4666 else {
4667 return;
4668 };
4669
4670 let exchange = tokio::time::timeout(
4671 Duration::from_secs(5),
4672 client.pull_exchange(
4673 &repo,
4674 "owner/repo",
4675 "main",
4676 PullOptions {
4677 local_thread: Some("feature"),
4678 depth: None,
4679 target_state: None,
4680 materialization: PullMaterialization::Full,
4681 },
4682 ),
4683 )
4684 .await
4685 .expect("explicit-thread pull must not hang")
4686 .expect("explicit-thread pull succeeds");
4687 server.abort();
4688
4689 let advertised = captured.lock().unwrap().clone().expect("request captured");
4690 assert!(
4691 advertised.is_empty(),
4692 "an explicit --local-thread with an incomplete closure must advertise nothing, \
4693 falling back to a full pull (got {advertised:?})"
4694 );
4695 assert!(exchange.result.success);
4696 let reassembled = wire::enumerate_state_closure(repo.store(), child)
4698 .expect("the client must hold the complete child closure after the fallback full pull");
4699 assert_eq!(
4700 reassembled.len(),
4701 child_closure.len(),
4702 "the full-pull fallback must leave the client with the complete closure, no gaps"
4703 );
4704 }
4705
4706 #[tokio::test]
4707 async fn explicit_local_thread_complete_closure_advertises_and_fires_short_circuit() {
4708 let (_dir, repo) = temp_repo();
4713 let head = put_state_with_blob(&repo, "tip", vec![]);
4714 repo.refs()
4715 .set_thread(&ThreadName::from("feature"), &head)
4716 .expect("set local thread");
4717 wire::enumerate_state_closure(repo.store(), head)
4719 .expect("client holds the complete thread closure");
4720
4721 let full_closure =
4722 wire::enumerate_state_closure(repo.store(), head).expect("enumerate closure");
4723 let full_pack =
4724 wire::build_native_pack(repo.store(), &full_closure).expect("build full pack");
4725 let captured = std::sync::Arc::new(std::sync::Mutex::new(None));
4726
4727 let Some((mut client, server)) = connect_delta_aware_service(DeltaAwarePullService {
4728 remote_state: head,
4729 full_closure,
4730 delta_objects: Vec::new(),
4731 known_parent: ChangeId::generate(),
4732 full_pack: full_pack.clone(),
4733 delta_pack: full_pack,
4734 captured_exclude: captured.clone(),
4735 })
4736 .await
4737 else {
4738 return;
4739 };
4740
4741 let exchange = tokio::time::timeout(
4742 Duration::from_secs(5),
4743 client.pull_exchange(
4744 &repo,
4745 "owner/repo",
4746 "main",
4747 PullOptions {
4748 local_thread: Some("feature"),
4749 depth: None,
4750 target_state: None,
4751 materialization: PullMaterialization::Full,
4752 },
4753 ),
4754 )
4755 .await
4756 .expect("explicit-thread pull must not hang")
4757 .expect("explicit-thread pull succeeds");
4758 server.abort();
4759
4760 let advertised = captured.lock().unwrap().clone().expect("request captured");
4761 assert_eq!(
4762 advertised,
4763 vec![head.to_string_full()],
4764 "an explicit --local-thread with a complete closure must advertise its head"
4765 );
4766 assert!(exchange.result.success);
4767 assert_eq!(
4768 exchange.object_count, 0,
4769 "advertising the complete head must fire the zero-delta short-circuit, not a full pull"
4770 );
4771 }
4772}