1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7 GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9 ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata, ThreadVerificationSummary,
10 TransportMode, UpdateRefRequest, WantObjects, pull_message, push_message,
11};
12use objects::{
13 object::{ChangeId, ContentHash},
14 store::PackObjectId,
15};
16use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
17use repo::{Repository, SyncedThreadMetadata, ThreadManager};
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Request;
21
22use super::{
23 HostedGrpcClient, PullMaterialization,
24 helpers::{
25 descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
26 status_to_protocol_error, to_proto_object_info, transport_mode_name,
27 },
28};
29
30#[derive(Clone, Copy)]
31struct PullOptions<'a> {
32 local_thread: Option<&'a str>,
33 depth: Option<u32>,
34 target_state: Option<ChangeId>,
35 materialization: PullMaterialization,
36}
37
38struct PullWantPlan {
39 wants: Vec<ObjectDescriptor>,
40 wanted_types: HashMap<PackObjectId, ObjectType>,
41 want_full_closure: bool,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct PullObjectMix {
46 pub blobs: usize,
47 pub trees: usize,
48 pub states: usize,
49 pub actions: usize,
50}
51
52impl PullObjectMix {
53 fn record(&mut self, obj_type: ObjectType) {
54 match obj_type {
55 ObjectType::Blob => self.blobs += 1,
56 ObjectType::Tree => self.trees += 1,
57 ObjectType::State => self.states += 1,
58 ObjectType::Action => self.actions += 1,
59 }
60 }
61
62 pub fn total(&self) -> usize {
63 self.blobs + self.trees + self.states + self.actions
64 }
65}
66
67#[derive(Debug, Clone, Default)]
68pub struct PullProfile {
69 pub ready_wait: Duration,
70 pub receive_and_apply: Duration,
71 pub decode: Duration,
72 pub store_receive_object: Duration,
73 pub metadata_sync: Duration,
74 pub pack_decode_apply: Duration,
75 pub raw_decode_apply: Duration,
76 pub pack_decode: Duration,
77 pub raw_decode: Duration,
78 pub bytes_received: usize,
79 pub pack_bytes_received: usize,
80 pub raw_bytes_received: usize,
81 pub objects_received: usize,
82 pub object_mix: PullObjectMix,
83}
84
85impl HostedGrpcClient {
86 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
87 let mut request = Request::new(ListRefsRequest {
88 repo_path: repo_path.to_string(),
89 });
90 self.apply_auth(&mut request)?;
91 let response = self
92 .inner
93 .list_refs(request)
94 .await
95 .map_err(status_to_protocol_error)?
96 .into_inner();
97 response
98 .refs
99 .into_iter()
100 .map(|entry| {
101 Ok(RefEntry {
102 name: entry.name,
103 change_id: ChangeId::try_from_slice(&entry.change_id)
104 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
105 is_thread: entry.is_thread,
106 })
107 })
108 .collect()
109 }
110
111 #[allow(clippy::too_many_arguments)]
112 pub async fn update_ref(
113 &mut self,
114 repo_path: &str,
115 name: &str,
116 is_thread: bool,
117 old_value: Option<ChangeId>,
118 new_value: ChangeId,
119 force: bool,
120 thread_metadata: Option<&SyncedThreadMetadata>,
121 ) -> Result<RefUpdated, ProtocolError> {
122 let mut request = Request::new(UpdateRefRequest {
123 repo_path: repo_path.to_string(),
124 name: name.to_string(),
125 is_thread,
126 force,
127 old_value: old_value
128 .map(|value| value.to_string_full())
129 .unwrap_or_default(),
130 new_value: new_value.to_string_full(),
131 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
132 client_operation_id: String::new(),
133 });
134 self.apply_auth(&mut request)?;
135 let response = self
136 .inner
137 .update_ref(request)
138 .await
139 .map_err(status_to_protocol_error)?
140 .into_inner();
141 Ok(RefUpdated {
142 success: response.success,
143 old_value: if response.old_value.is_empty() {
144 None
145 } else {
146 Some(
147 ChangeId::parse(&response.old_value)
148 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
149 )
150 },
151 error: (!response.error.is_empty()).then_some(response.error),
152 })
153 }
154
155 pub async fn push(
156 &mut self,
157 repo: &Repository,
158 repo_path: &str,
159 local_state: ChangeId,
160 target_thread: &str,
161 force: bool,
162 ) -> Result<PushComplete, ProtocolError> {
163 let _ = self.transport.chunk_size;
164 let _ = self.transport.resume_attempts;
165 let _ = self.transport.negotiated.chunk_size();
166 let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
167 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
168 let transport_mode = preferred_transport_mode(&self.transport, objects.len());
169 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
170 let request_message = PushMessage {
171 body: Some(push_message::Body::Request(PushRequest {
172 repo_path: repo_path.to_string(),
173 local_state: local_state.to_string_full(),
174 target_thread: target_thread.to_string(),
175 create_thread: true,
176 force,
177 objects: objects.iter().map(to_proto_object_info).collect(),
178 transfer: Some(self.transport.transfer_checkpoint_with_mode(
179 transfer_id.clone(),
180 transport_mode,
181 0,
182 0,
183 false,
184 )),
185 partial_fetch_status: partial_fetch_status_for_repo(repo),
186 allow_partial_fetch: true,
187 thread_metadata: thread_metadata
188 .map(|metadata| to_proto_thread_metadata(&metadata)),
189 client_operation_id: String::new(),
190 })),
191 };
192
193 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
194 tx.send(request_message).await.map_err(|_| {
195 ProtocolError::InvalidState("failed to initialize push stream".to_string())
196 })?;
197 let mut request = Request::new(ReceiverStream::new(rx));
198 self.apply_auth(&mut request)?;
199 let mut response = self
200 .inner
201 .push(request)
202 .await
203 .map_err(status_to_protocol_error)?
204 .into_inner();
205
206 let ready = match response.message().await.map_err(status_to_protocol_error)? {
207 Some(PushMessage {
208 body: Some(push_message::Body::Ready(ready)),
209 }) => ready,
210 _ => {
211 return Err(ProtocolError::InvalidState(
212 "expected PushReady from gRPC server".to_string(),
213 ));
214 }
215 };
216
217 let object_index = objects
218 .into_iter()
219 .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
220 .collect::<HashMap<_, _>>();
221
222 let ready_transport_mode = ready
223 .transfer
224 .as_ref()
225 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
226 .unwrap_or(transport_mode);
227 let wanted_infos = ready
228 .want_objects
229 .into_iter()
230 .map(|want| {
231 object_index
232 .get(&descriptor_id(&want))
233 .cloned()
234 .ok_or_else(|| {
235 ProtocolError::InvalidState("server requested unknown object".to_string())
236 })
237 })
238 .collect::<Result<Vec<_>, _>>()?;
239
240 if !wanted_infos.is_empty() {
241 let bundle = proto::build_native_pack(repo.store(), &wanted_infos)?;
242 for message in encode_native_pack_messages(
243 &bundle,
244 &transfer_id,
245 self.transport.chunk_size.max(1),
246 &self.transport,
247 ready_transport_mode,
248 )? {
249 tx.send(message).await.map_err(|_| {
250 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
251 })?;
252 }
253 }
254 drop(tx);
255
256 let result = match response.message().await.map_err(status_to_protocol_error)? {
257 Some(PushMessage {
258 body: Some(push_message::Body::Complete(complete)),
259 }) => PushComplete {
260 success: complete.success,
261 new_state: if complete.new_state.is_empty() {
262 None
263 } else {
264 Some(
265 ChangeId::parse(&complete.new_state)
266 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
267 )
268 },
269 error: (!complete.error.is_empty()).then_some(complete.error),
270 transfer_id: complete
271 .transfer
272 .as_ref()
273 .map(|transfer| transfer.transfer_id.clone())
274 .unwrap_or_default(),
275 transport_mode: complete
276 .transfer
277 .as_ref()
278 .map(|transfer| transport_mode_name(transfer.transport_mode))
279 .unwrap_or("raw-objects")
280 .to_string(),
281 resume_offset: complete
282 .transfer
283 .as_ref()
284 .map(|transfer| transfer.resume_offset)
285 .unwrap_or_default(),
286 chunk_index: complete
287 .transfer
288 .as_ref()
289 .map(|transfer| transfer.chunk_index)
290 .unwrap_or_default(),
291 checkpoint: complete
292 .transfer
293 .as_ref()
294 .map(|transfer| transfer.checkpoint.clone())
295 .unwrap_or_default(),
296 is_complete: complete
297 .transfer
298 .as_ref()
299 .map(|transfer| transfer.is_complete)
300 .unwrap_or(false),
301 },
302 _ => {
303 return Err(ProtocolError::InvalidState(
304 "expected PushComplete from gRPC server".to_string(),
305 ));
306 }
307 };
308
309 if result.success {
310 self.sync_remote_markers(repo, repo_path, local_state)
311 .await?;
312 }
313 Ok(result)
314 }
315
316 pub async fn pull(
317 &mut self,
318 repo: &Repository,
319 repo_path: &str,
320 remote_thread: &str,
321 local_thread: Option<&str>,
322 ) -> Result<PullComplete, ProtocolError> {
323 self.pull_with_options(
324 repo,
325 repo_path,
326 remote_thread,
327 PullOptions {
328 local_thread,
329 depth: None,
330 target_state: None,
331 materialization: PullMaterialization::Full,
332 },
333 )
334 .await
335 }
336
337 pub async fn pull_profiled(
338 &mut self,
339 repo: &Repository,
340 repo_path: &str,
341 remote_thread: &str,
342 local_thread: Option<&str>,
343 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
344 self.pull_exchange(
345 repo,
346 repo_path,
347 remote_thread,
348 PullOptions {
349 local_thread,
350 depth: None,
351 target_state: None,
352 materialization: PullMaterialization::Full,
353 },
354 )
355 .await
356 .map(|exchange| (exchange.result, exchange.profile))
357 }
358
359 pub async fn pull_partial(
360 &mut self,
361 repo: &Repository,
362 repo_path: &str,
363 remote_thread: &str,
364 local_thread: Option<&str>,
365 ) -> Result<PullComplete, ProtocolError> {
366 self.pull_with_options(
367 repo,
368 repo_path,
369 remote_thread,
370 PullOptions {
371 local_thread,
372 depth: None,
373 target_state: None,
374 materialization: PullMaterialization::Lazy,
375 },
376 )
377 .await
378 }
379
380 pub async fn pull_with_depth_and_materialization(
381 &mut self,
382 repo: &Repository,
383 repo_path: &str,
384 remote_thread: &str,
385 local_thread: Option<&str>,
386 depth: Option<u32>,
387 materialization: PullMaterialization,
388 ) -> Result<PullComplete, ProtocolError> {
389 self.pull_with_options(
390 repo,
391 repo_path,
392 remote_thread,
393 PullOptions {
394 local_thread,
395 depth,
396 target_state: None,
397 materialization,
398 },
399 )
400 .await
401 }
402
403 pub async fn pull_with_depth(
404 &mut self,
405 repo: &Repository,
406 repo_path: &str,
407 remote_thread: &str,
408 local_thread: Option<&str>,
409 depth: Option<u32>,
410 ) -> Result<PullComplete, ProtocolError> {
411 self.pull_with_depth_and_materialization(
412 repo,
413 repo_path,
414 remote_thread,
415 local_thread,
416 depth,
417 PullMaterialization::Full,
418 )
419 .await
420 }
421
422 pub async fn fetch_state(
423 &mut self,
424 repo: &Repository,
425 repo_path: &str,
426 remote_thread: &str,
427 target_state: ChangeId,
428 ) -> Result<usize, ProtocolError> {
429 self.pull_exchange(
430 repo,
431 repo_path,
432 remote_thread,
433 PullOptions {
434 local_thread: None,
435 depth: None,
436 target_state: Some(target_state),
437 materialization: PullMaterialization::Full,
438 },
439 )
440 .await
441 .map(|exchange| exchange.object_count)
442 }
443
444 pub async fn fetch_state_partial(
445 &mut self,
446 repo: &Repository,
447 repo_path: &str,
448 remote_thread: &str,
449 target_state: ChangeId,
450 ) -> Result<usize, ProtocolError> {
451 self.pull_exchange(
452 repo,
453 repo_path,
454 remote_thread,
455 PullOptions {
456 local_thread: None,
457 depth: None,
458 target_state: Some(target_state),
459 materialization: PullMaterialization::Lazy,
460 },
461 )
462 .await
463 .map(|exchange| exchange.object_count)
464 }
465
466 pub async fn hydrate_blob_at_path(
467 &mut self,
468 repo: &Repository,
469 repo_path: &str,
470 reference: &str,
471 path: &str,
472 ) -> Result<objects::object::Blob, ProtocolError> {
473 let mut request = Request::new(GetBlobRequest {
474 repo_path: repo_path.to_string(),
475 r#ref: reference.to_string(),
476 path: path.to_string(),
477 });
478 self.apply_auth(&mut request)?;
479 let response = self
480 .content
481 .get_blob(request)
482 .await
483 .map_err(status_to_protocol_error)?
484 .into_inner();
485
486 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
487 let blob = objects::object::Blob::new(content);
488 repo.store().put_blob(&blob)?;
489 repo.clear_missing_blob(&blob.hash())?;
490 Ok(blob)
491 }
492
493 pub async fn hydrate_missing_blobs_for_state(
494 &mut self,
495 repo: &Repository,
496 repo_path: &str,
497 remote_thread: &str,
498 target_state: ChangeId,
499 ) -> Result<usize, ProtocolError> {
500 let exchange = self
501 .pull_exchange(
502 repo,
503 repo_path,
504 remote_thread,
505 PullOptions {
506 local_thread: None,
507 depth: None,
508 target_state: Some(target_state),
509 materialization: PullMaterialization::Full,
510 },
511 )
512 .await?;
513 clear_missing_blobs_for_state(repo, target_state)?;
514 Ok(exchange.object_count)
515 }
516
517 async fn pull_with_options(
518 &mut self,
519 repo: &Repository,
520 repo_path: &str,
521 remote_thread: &str,
522 options: PullOptions<'_>,
523 ) -> Result<PullComplete, ProtocolError> {
524 self.pull_exchange(repo, repo_path, remote_thread, options)
525 .await
526 .map(|exchange| exchange.result)
527 }
528
529 async fn pull_exchange(
530 &mut self,
531 repo: &Repository,
532 repo_path: &str,
533 remote_thread: &str,
534 options: PullOptions<'_>,
535 ) -> Result<PullExchange, ProtocolError> {
536 let exchange_start = Instant::now();
537 let mut exclude_states = Vec::new();
538 if let Some(local_thread) = options.local_thread
539 && let Some(head) = repo.refs().get_thread(local_thread)?
540 {
541 exclude_states.push(head);
542 }
543 let allow_partial_fetch = options.materialization.allows_partial_fetch();
544 let fresh_full_pull =
545 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
546
547 let transfer_id = pull_transfer_id(
548 repo_path,
549 remote_thread,
550 options.local_thread,
551 options.depth,
552 options.target_state,
553 );
554 let request_message = PullMessage {
555 body: Some(pull_message::Body::Request(PullRequest {
556 repo_path: repo_path.to_string(),
557 remote_thread: remote_thread.to_string(),
558 local_thread: options.local_thread.unwrap_or_default().to_string(),
559 target_state: options
560 .target_state
561 .map(|value| value.to_string_full())
562 .unwrap_or_default(),
563 depth: options.depth.unwrap_or_default(),
564 exclude_states: exclude_states
565 .iter()
566 .map(ChangeId::to_string_full)
567 .collect(),
568 transfer: Some(self.transport.transfer_checkpoint_with_mode(
569 transfer_id.clone(),
570 TransportMode::NativePack,
571 0,
572 0,
573 false,
574 )),
575 partial_fetch_status: partial_fetch_status_for_repo(repo),
576 allow_partial_fetch,
577 fresh_full_pull,
578 client_operation_id: String::new(),
579 })),
580 };
581
582 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
583 tx.send(request_message).await.map_err(|_| {
584 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
585 })?;
586 let mut request = Request::new(ReceiverStream::new(rx));
587 self.apply_auth(&mut request)?;
588 let mut response = self
589 .inner
590 .pull(request)
591 .await
592 .map_err(status_to_protocol_error)?
593 .into_inner();
594
595 let ready = match response.message().await.map_err(status_to_protocol_error)? {
596 Some(PullMessage {
597 body: Some(pull_message::Body::Ready(ready)),
598 }) => ready,
599 _ => {
600 return Err(ProtocolError::InvalidState(
601 "expected PullReady from gRPC server".to_string(),
602 ));
603 }
604 };
605 let mut profile = PullProfile {
606 ready_wait: exchange_start.elapsed(),
607 ..PullProfile::default()
608 };
609 let remote_state = ChangeId::parse(&ready.remote_state)
610 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
611 let PullWantPlan {
612 wants,
613 wanted_types,
614 want_full_closure,
615 } = plan_pull_wants(
616 repo,
617 &remote_state,
618 ready.full_closure_available,
619 ready.objects_to_fetch,
620 allow_partial_fetch,
621 )?;
622
623 tx.send(PullMessage {
624 body: Some(pull_message::Body::Want(WantObjects {
625 objects: wants.clone(),
626 want_full_closure,
627 transfer: Some(self.transport.transfer_checkpoint_with_mode(
628 transfer_id.clone(),
629 TransportMode::NativePack,
630 0,
631 0,
632 false,
633 )),
634 })),
635 })
636 .await
637 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
638 drop(tx);
639
640 let receive_start = Instant::now();
641 let mut pack_state = proto::PackChunkState::default();
642 let mut received = 0usize;
643 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
644 match message.body {
645 Some(pull_message::Body::Pack(chunk)) => {
646 profile.bytes_received =
647 profile.bytes_received.saturating_add(chunk.data.len());
648 profile.pack_bytes_received =
649 profile.pack_bytes_received.saturating_add(chunk.data.len());
650 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
651 ProtocolError::InvalidState(
652 "native pack chunk missing transfer checkpoint".to_string(),
653 )
654 })?;
655 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
656 .unwrap_or(PackStreamKind::Unspecified);
657 if stream_kind == PackStreamKind::Unspecified {
658 return Err(ProtocolError::InvalidState(
659 "native pack chunk missing stream kind".to_string(),
660 ));
661 }
662 let decode_start = Instant::now();
663 proto::receive_pack_chunk(
664 &mut pack_state,
665 stream_kind == PackStreamKind::Index,
666 transfer.resume_offset,
667 transfer.chunk_index,
668 transfer.is_complete,
669 &chunk.data,
670 chunk.is_final_chunk,
671 )?;
672 let decode_elapsed = decode_start.elapsed();
673 profile.pack_decode += decode_elapsed;
674 profile.pack_decode_apply += decode_elapsed;
675 profile.decode += decode_elapsed;
676 }
677 Some(pull_message::Body::Complete(complete)) => {
678 profile.receive_and_apply = receive_start.elapsed();
679 let final_state = if complete.new_state.is_empty() {
680 None
681 } else {
682 Some(
683 ChangeId::parse(&complete.new_state)
684 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
685 )
686 };
687
688 if complete.success {
689 if want_full_closure || !wants.is_empty() {
690 if !pack_state.is_complete() {
691 return Err(ProtocolError::InvalidState(
692 "pull completed before native pack stream finished".to_string(),
693 ));
694 }
695 let store_start = Instant::now();
696 let installed_ids = proto::install_received_pack(
697 repo.store(),
698 &pack_state.pack_data,
699 &pack_state.index_data,
700 )?;
701 profile.store_receive_object += store_start.elapsed();
702 received = installed_ids.len();
703 for id in installed_ids {
704 match (id, wanted_types.get(&id).copied()) {
705 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
706 profile.object_mix.record(ObjectType::Blob);
707 repo.clear_missing_blob(&hash)?;
708 }
709 (_, Some(obj_type)) => {
710 profile.object_mix.record(obj_type);
711 }
712 (PackObjectId::ChangeId(_), None) => {
713 profile.object_mix.record(ObjectType::State);
714 }
715 (PackObjectId::Hash(hash), None) => {
716 let inferred =
717 infer_installed_hash_object_type(repo, &hash)?;
718 profile.object_mix.record(inferred);
719 }
720 }
721 }
722 }
723
724 let metadata_start = Instant::now();
725 if let Some(local_thread) = options.local_thread
726 && let Some(state) = final_state
727 {
728 repo.refs().set_thread(local_thread, &state)?;
729 }
730 if let Some(state) = final_state
731 && allow_partial_fetch
732 {
733 mark_missing_blobs_for_state(repo, state)?;
734 } else if final_state.is_some() {
735 let _ = repo.clear_all_missing_blobs()?;
736 }
737 let synced_markers = complete
738 .transfer
739 .as_ref()
740 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
741 .transpose()?
742 .unwrap_or(false);
743 if !synced_markers {
744 self.sync_local_markers(repo, repo_path).await?;
745 }
746 profile.metadata_sync = metadata_start.elapsed();
747 profile.objects_received = received;
748 return Ok(PullExchange {
749 result: PullComplete {
750 success: true,
751 final_state,
752 error: None,
753 transfer_id: complete
754 .transfer
755 .as_ref()
756 .map(|transfer| transfer.transfer_id.clone())
757 .unwrap_or_default(),
758 transport_mode: complete
759 .transfer
760 .as_ref()
761 .map(|transfer| transport_mode_name(transfer.transport_mode))
762 .unwrap_or("native-pack")
763 .to_string(),
764 resume_offset: complete
765 .transfer
766 .as_ref()
767 .map(|transfer| transfer.resume_offset)
768 .unwrap_or_default(),
769 chunk_index: complete
770 .transfer
771 .as_ref()
772 .map(|transfer| transfer.chunk_index)
773 .unwrap_or_default(),
774 checkpoint: complete
775 .transfer
776 .as_ref()
777 .map(|transfer| transfer.checkpoint.clone())
778 .unwrap_or_default(),
779 is_complete: complete
780 .transfer
781 .as_ref()
782 .map(|transfer| transfer.is_complete)
783 .unwrap_or(false),
784 },
785 object_count: received,
786 profile,
787 });
788 }
789
790 profile.objects_received = received;
791 return Ok(PullExchange {
792 result: PullComplete {
793 success: false,
794 final_state,
795 error: (!complete.error.is_empty()).then_some(complete.error),
796 transfer_id: complete
797 .transfer
798 .as_ref()
799 .map(|transfer| transfer.transfer_id.clone())
800 .unwrap_or_default(),
801 transport_mode: complete
802 .transfer
803 .as_ref()
804 .map(|transfer| transport_mode_name(transfer.transport_mode))
805 .unwrap_or("native-pack")
806 .to_string(),
807 resume_offset: complete
808 .transfer
809 .as_ref()
810 .map(|transfer| transfer.resume_offset)
811 .unwrap_or_default(),
812 chunk_index: complete
813 .transfer
814 .as_ref()
815 .map(|transfer| transfer.chunk_index)
816 .unwrap_or_default(),
817 checkpoint: complete
818 .transfer
819 .as_ref()
820 .map(|transfer| transfer.checkpoint.clone())
821 .unwrap_or_default(),
822 is_complete: complete
823 .transfer
824 .as_ref()
825 .map(|transfer| transfer.is_complete)
826 .unwrap_or(false),
827 },
828 object_count: received,
829 profile,
830 });
831 }
832 _ => {}
833 }
834 }
835
836 Err(ProtocolError::InvalidState(format!(
837 "pull stream ended unexpectedly after receiving {received} packed objects"
838 )))
839 }
840}
841
842fn load_thread_metadata(
843 repo: &Repository,
844 target_thread: &str,
845 local_state: ChangeId,
846) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
847 let thread_manager = ThreadManager::new(repo.heddle_dir());
848 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
849}
850
851fn plan_pull_wants(
852 repo: &Repository,
853 remote_state: &ChangeId,
854 full_closure_available: bool,
855 objects_to_fetch: Vec<ObjectDescriptor>,
856 allow_partial_fetch: bool,
857) -> Result<PullWantPlan, ProtocolError> {
858 if full_closure_available {
859 return Ok(PullWantPlan {
860 wants: Vec::new(),
861 wanted_types: HashMap::new(),
862 want_full_closure: true,
863 });
864 }
865 let request_full_closure =
866 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
867 let mut wants = Vec::with_capacity(objects_to_fetch.len());
868 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
869
870 for descriptor in objects_to_fetch {
871 let info = parse_descriptor_to_info(descriptor)?;
872 let pack_id = match &info.id {
873 proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
874 proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
875 };
876 let include = if request_full_closure {
877 true
878 } else {
879 let has = proto::has_object(repo.store(), &info)?;
880 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
881 };
882
883 if include {
884 wanted_types.insert(pack_id, info.obj_type);
885 wants.push(object_descriptor_with_status(
886 &info,
887 ObjectAvailabilityStatus::Missing,
888 "requested by client",
889 ));
890 }
891 }
892
893 Ok(PullWantPlan {
894 wants,
895 wanted_types,
896 want_full_closure: false,
897 })
898}
899
900fn supports_compact_full_pull(
901 repo: &Repository,
902 allow_partial_fetch: bool,
903 exclude_states: &[ChangeId],
904) -> Result<bool, ProtocolError> {
905 if allow_partial_fetch || !exclude_states.is_empty() {
906 return Ok(false);
907 }
908 repo_looks_fresh(repo)
909}
910
911fn should_request_full_closure(
912 repo: &Repository,
913 remote_state: &ChangeId,
914 allow_partial_fetch: bool,
915) -> Result<bool, ProtocolError> {
916 if allow_partial_fetch || repo.store().has_state(remote_state)? {
917 return Ok(false);
918 }
919 repo_looks_fresh(repo)
920}
921
922fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
923 if repo.head()?.is_some() {
924 return Ok(false);
925 }
926 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
927 return Ok(false);
928 }
929 Ok(repo.missing_blobs()?.is_empty())
930}
931
932fn infer_installed_hash_object_type(
933 repo: &Repository,
934 hash: &ContentHash,
935) -> Result<ObjectType, ProtocolError> {
936 let store = repo.store();
937 if store.get_tree(hash)?.is_some() {
938 return Ok(ObjectType::Tree);
939 }
940 if store
941 .get_action(&objects::object::ActionId::from_hash(*hash))?
942 .is_some()
943 {
944 return Ok(ObjectType::Action);
945 }
946 Ok(ObjectType::Blob)
947}
948
949fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
950 const HEADER: &str = "heddle-markers-v1\n";
951 if checkpoint.is_empty() {
952 return Ok(false);
953 }
954 let payload = std::str::from_utf8(checkpoint)
955 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
956 let Some(lines) = payload.strip_prefix(HEADER) else {
957 return Ok(false);
958 };
959
960 for line in lines.lines() {
961 if line.is_empty() {
962 continue;
963 }
964 let Some((name, change_id)) = line.split_once('\t') else {
965 return Err(ProtocolError::InvalidState(
966 "invalid marker snapshot line".to_string(),
967 ));
968 };
969 let change_id = ChangeId::parse(change_id)
970 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
971 if !repo.store().has_state(&change_id)? {
972 continue;
973 }
974 match repo.refs().get_marker(name)? {
975 Some(existing) if existing == change_id => {}
976 Some(existing) => repo.refs().set_marker_cas(
977 name,
978 refs::RefExpectation::Value(existing),
979 &change_id,
980 )?,
981 None => repo.refs().create_marker(name, &change_id)?,
982 }
983 }
984
985 Ok(true)
986}
987
988fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
989 if s.is_empty() {
990 return Vec::new();
991 }
992 objects::object::ChangeId::parse(s)
993 .map(|id| id.as_bytes().to_vec())
994 .unwrap_or_default()
995}
996
997fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
998 ThreadMetadata {
999 name: metadata.thread.clone(),
1000 target_thread: metadata.target_thread.clone(),
1001 parent_thread: metadata.parent_thread.clone(),
1002 task: metadata.task.clone(),
1003 thread_mode: metadata.mode.to_string(),
1004 thread_state: metadata.state.to_string(),
1005 freshness: metadata.freshness.to_string(),
1006 base_state: change_id_string_to_bytes(&metadata.base_state),
1007 base_root: change_id_string_to_bytes(&metadata.base_root),
1008 current_state: metadata
1009 .current_state
1010 .as_deref()
1011 .map(change_id_string_to_bytes),
1012 merged_state: metadata
1013 .merged_state
1014 .as_deref()
1015 .map(change_id_string_to_bytes),
1016 changed_paths: metadata.changed_paths.clone(),
1017 impact_categories: metadata
1018 .impact_categories
1019 .iter()
1020 .map(ToString::to_string)
1021 .collect(),
1022 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1023 promotion_suggested: metadata.promotion_suggested,
1024 verification_summary: Some(ThreadVerificationSummary {
1025 tests_passed: metadata.verification_summary.tests_passed,
1026 tests_failed: metadata
1027 .verification_summary
1028 .tests_failed
1029 .unwrap_or_default(),
1030 coverage_pct: metadata.verification_summary.coverage_pct,
1031 lint_warnings: metadata.verification_summary.lint_warnings,
1032 }),
1033 confidence_summary: Some(ThreadConfidenceSummary {
1034 value: metadata.confidence_summary.value,
1035 band: metadata
1036 .confidence_summary
1037 .band
1038 .as_ref()
1039 .map(ToString::to_string),
1040 }),
1041 integration_policy_result: Some(ThreadIntegrationPolicy {
1042 status: metadata
1043 .integration_policy_result
1044 .status
1045 .clone()
1046 .unwrap_or_default(),
1047 reason: metadata
1048 .integration_policy_result
1049 .reason
1050 .clone()
1051 .unwrap_or_default(),
1052 }),
1053 created_at: Some(prost_types::Timestamp {
1054 seconds: metadata.created_at.timestamp(),
1055 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1056 }),
1057 updated_at: Some(prost_types::Timestamp {
1058 seconds: metadata.updated_at.timestamp(),
1059 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1060 }),
1061 }
1062}
1063
1064struct PullExchange {
1065 result: PullComplete,
1066 object_count: usize,
1067 profile: PullProfile,
1068}
1069
1070fn mark_missing_blobs_for_state(
1071 repo: &Repository,
1072 state_id: ChangeId,
1073) -> Result<(), ProtocolError> {
1074 let state = repo
1075 .store()
1076 .get_state(&state_id)?
1077 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1078 let mut missing = collect_missing_blobs(repo, &state.tree);
1079 if let Some(context_root) = state.context.as_ref() {
1080 missing.extend(collect_missing_blobs(repo, context_root));
1081 }
1082 missing
1083 .into_iter()
1084 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1085}
1086
1087fn clear_missing_blobs_for_state(
1088 repo: &Repository,
1089 state_id: ChangeId,
1090) -> Result<(), ProtocolError> {
1091 let state = repo
1092 .store()
1093 .get_state(&state_id)?
1094 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1095 let mut missing = collect_missing_blobs(repo, &state.tree);
1096 if let Some(context_root) = state.context.as_ref() {
1097 missing.extend(collect_missing_blobs(repo, context_root));
1098 }
1099 missing
1100 .into_iter()
1101 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1102}
1103
1104fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1105 let mut missing = Vec::new();
1106 collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1107 missing
1108}
1109
1110fn collect_missing_blobs_recursive(
1111 repo: &Repository,
1112 tree_hash: &ContentHash,
1113 missing: &mut Vec<ContentHash>,
1114) {
1115 let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1116 return;
1117 };
1118 for entry in tree.entries() {
1119 match entry.entry_type {
1120 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1121 if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1122 missing.push(entry.hash);
1123 }
1124 }
1125 objects::object::EntryType::Tree => {
1126 collect_missing_blobs_recursive(repo, &entry.hash, missing);
1127 }
1128 }
1129 }
1130}
1131
1132fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1133 match repo.missing_blobs() {
1134 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1135 Ok(_) => PartialFetchStatus::Disabled as i32,
1136 Err(_) => PartialFetchStatus::Unspecified as i32,
1137 }
1138}
1139
1140fn pull_transfer_id(
1141 repo_path: &str,
1142 remote_thread: &str,
1143 local_thread: Option<&str>,
1144 depth: Option<u32>,
1145 target_state: Option<ChangeId>,
1146) -> String {
1147 format!(
1148 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1149 local_thread.unwrap_or_default(),
1150 target_state
1151 .map(|value| value.to_string_full())
1152 .unwrap_or_default()
1153 )
1154}
1155
1156fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1157 format!(
1158 "push:{repo_path}:{}:{target_thread}",
1159 local_state.to_string_full()
1160 )
1161}
1162
1163fn encode_native_pack_messages(
1164 bundle: &proto::NativePackBundle,
1165 transfer_id: &str,
1166 chunk_size: usize,
1167 transport: &super::helpers::HostedTransportPolicy,
1168 transport_mode: TransportMode,
1169) -> Result<Vec<PushMessage>, ProtocolError> {
1170 let mut messages = Vec::new();
1171 let chunk_size = chunk_size.max(1);
1172
1173 let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1174 for chunk_index in 0..pack_total_chunks.max(1) {
1175 let Some((start, len)) =
1176 proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1177 else {
1178 break;
1179 };
1180 messages.push(PushMessage {
1181 body: Some(push_message::Body::Pack(PackChunk {
1182 stream_kind: PackStreamKind::Pack as i32,
1183 data: bundle.pack_data[start..start + len].to_vec(),
1184 transfer: Some(transport.transfer_checkpoint_with_mode(
1185 transfer_id,
1186 transport_mode,
1187 chunk_index as u32,
1188 start as u64,
1189 chunk_index + 1 == pack_total_chunks,
1190 )),
1191 chunk_length: len as u32,
1192 is_final_chunk: chunk_index + 1 == pack_total_chunks,
1193 })),
1194 });
1195 }
1196
1197 let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1198 for chunk_index in 0..index_total_chunks.max(1) {
1199 let Some((start, len)) =
1200 proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1201 else {
1202 break;
1203 };
1204 messages.push(PushMessage {
1205 body: Some(push_message::Body::Pack(PackChunk {
1206 stream_kind: PackStreamKind::Index as i32,
1207 data: bundle.index_data[start..start + len].to_vec(),
1208 transfer: Some(transport.transfer_checkpoint_with_mode(
1209 transfer_id,
1210 transport_mode,
1211 chunk_index as u32,
1212 start as u64,
1213 chunk_index + 1 == index_total_chunks,
1214 )),
1215 chunk_length: len as u32,
1216 is_final_chunk: chunk_index + 1 == index_total_chunks,
1217 })),
1218 });
1219 }
1220 Ok(messages)
1221}
1222
1223fn preferred_transport_mode(
1224 transport: &super::helpers::HostedTransportPolicy,
1225 object_count: usize,
1226) -> TransportMode {
1227 let _ = transport;
1228 let _ = object_count;
1229 TransportMode::NativePack
1230}