1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7 GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9 ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata, ThreadVerificationSummary,
10 TransportMode, UpdateRefRequest, WantObjects, pull_message, push_message,
11};
12use objects::{
13 object::{ChangeId, ContentHash},
14 store::PackObjectId,
15};
16use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
17use repo::{Repository, SyncedThreadMetadata, ThreadManager};
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Request;
21
22use super::{
23 HostedGrpcClient, PullMaterialization,
24 helpers::{
25 descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
26 status_to_protocol_error, to_proto_object_info, transport_mode_name,
27 },
28};
29
30#[derive(Clone, Copy)]
31struct PullOptions<'a> {
32 local_thread: Option<&'a str>,
33 depth: Option<u32>,
34 target_state: Option<ChangeId>,
35 materialization: PullMaterialization,
36}
37
38struct PullWantPlan {
39 wants: Vec<ObjectDescriptor>,
40 wanted_types: HashMap<PackObjectId, ObjectType>,
41 want_full_closure: bool,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct PullObjectMix {
46 pub blobs: usize,
47 pub trees: usize,
48 pub states: usize,
49 pub actions: usize,
50 pub redactions: usize,
51}
52
53impl PullObjectMix {
54 fn record(&mut self, obj_type: ObjectType) {
55 match obj_type {
56 ObjectType::Blob => self.blobs += 1,
57 ObjectType::Tree => self.trees += 1,
58 ObjectType::State => self.states += 1,
59 ObjectType::Action => self.actions += 1,
60 ObjectType::Redaction => self.redactions += 1,
61 }
62 }
63
64 pub fn total(&self) -> usize {
65 self.blobs + self.trees + self.states + self.actions + self.redactions
66 }
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct PullProfile {
71 pub ready_wait: Duration,
72 pub receive_and_apply: Duration,
73 pub decode: Duration,
74 pub store_receive_object: Duration,
75 pub metadata_sync: Duration,
76 pub pack_decode_apply: Duration,
77 pub raw_decode_apply: Duration,
78 pub pack_decode: Duration,
79 pub raw_decode: Duration,
80 pub bytes_received: usize,
81 pub pack_bytes_received: usize,
82 pub raw_bytes_received: usize,
83 pub objects_received: usize,
84 pub object_mix: PullObjectMix,
85}
86
87impl HostedGrpcClient {
88 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
89 let mut request = Request::new(ListRefsRequest {
90 repo_path: repo_path.to_string(),
91 });
92 self.apply_auth(&mut request)?;
93 let response = self
94 .inner
95 .list_refs(request)
96 .await
97 .map_err(status_to_protocol_error)?
98 .into_inner();
99 response
100 .refs
101 .into_iter()
102 .map(|entry| {
103 Ok(RefEntry {
104 name: entry.name,
105 change_id: ChangeId::try_from_slice(&entry.change_id)
106 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
107 is_thread: entry.is_thread,
108 })
109 })
110 .collect()
111 }
112
113 #[allow(clippy::too_many_arguments)]
114 pub async fn update_ref(
115 &mut self,
116 repo_path: &str,
117 name: &str,
118 is_thread: bool,
119 old_value: Option<ChangeId>,
120 new_value: ChangeId,
121 force: bool,
122 thread_metadata: Option<&SyncedThreadMetadata>,
123 ) -> Result<RefUpdated, ProtocolError> {
124 let mut request = Request::new(UpdateRefRequest {
125 repo_path: repo_path.to_string(),
126 name: name.to_string(),
127 is_thread,
128 force,
129 old_value: old_value
130 .map(|value| value.to_string_full())
131 .unwrap_or_default(),
132 new_value: new_value.to_string_full(),
133 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
134 client_operation_id: String::new(),
135 });
136 self.apply_auth(&mut request)?;
137 let response = self
138 .inner
139 .update_ref(request)
140 .await
141 .map_err(status_to_protocol_error)?
142 .into_inner();
143 Ok(RefUpdated {
144 success: response.success,
145 old_value: if response.old_value.is_empty() {
146 None
147 } else {
148 Some(
149 ChangeId::parse(&response.old_value)
150 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
151 )
152 },
153 error: (!response.error.is_empty()).then_some(response.error),
154 })
155 }
156
157 pub async fn push(
158 &mut self,
159 repo: &Repository,
160 repo_path: &str,
161 local_state: ChangeId,
162 target_thread: &str,
163 force: bool,
164 ) -> Result<PushComplete, ProtocolError> {
165 let _ = self.transport.chunk_size;
166 let _ = self.transport.resume_attempts;
167 let _ = self.transport.negotiated.chunk_size();
168 let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
169 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
170 let transport_mode = preferred_transport_mode(&self.transport, objects.len());
171 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
172 let request_message = PushMessage {
173 body: Some(push_message::Body::Request(PushRequest {
174 repo_path: repo_path.to_string(),
175 local_state: local_state.to_string_full(),
176 target_thread: target_thread.to_string(),
177 create_thread: true,
178 force,
179 objects: objects.iter().map(to_proto_object_info).collect(),
180 transfer: Some(self.transport.transfer_checkpoint_with_mode(
181 transfer_id.clone(),
182 transport_mode,
183 0,
184 0,
185 false,
186 )),
187 partial_fetch_status: partial_fetch_status_for_repo(repo),
188 allow_partial_fetch: true,
189 thread_metadata: thread_metadata
190 .map(|metadata| to_proto_thread_metadata(&metadata)),
191 client_operation_id: String::new(),
192 })),
193 };
194
195 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
196 tx.send(request_message).await.map_err(|_| {
197 ProtocolError::InvalidState("failed to initialize push stream".to_string())
198 })?;
199 let mut request = Request::new(ReceiverStream::new(rx));
200 self.apply_auth(&mut request)?;
201 let mut response = self
202 .inner
203 .push(request)
204 .await
205 .map_err(status_to_protocol_error)?
206 .into_inner();
207
208 let ready = match response.message().await.map_err(status_to_protocol_error)? {
209 Some(PushMessage {
210 body: Some(push_message::Body::Ready(ready)),
211 }) => ready,
212 _ => {
213 return Err(ProtocolError::InvalidState(
214 "expected PushReady from gRPC server".to_string(),
215 ));
216 }
217 };
218
219 let object_index = objects
220 .into_iter()
221 .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
222 .collect::<HashMap<_, _>>();
223
224 let ready_transport_mode = ready
225 .transfer
226 .as_ref()
227 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
228 .unwrap_or(transport_mode);
229 let wanted_infos = ready
230 .want_objects
231 .into_iter()
232 .map(|want| {
233 object_index
234 .get(&descriptor_id(&want))
235 .cloned()
236 .ok_or_else(|| {
237 ProtocolError::InvalidState("server requested unknown object".to_string())
238 })
239 })
240 .collect::<Result<Vec<_>, _>>()?;
241
242 if !wanted_infos.is_empty() {
243 let bundle = proto::build_native_pack(repo.store(), &wanted_infos)?;
244 for message in encode_native_pack_messages(
245 &bundle,
246 &transfer_id,
247 self.transport.chunk_size.max(1),
248 &self.transport,
249 ready_transport_mode,
250 )? {
251 tx.send(message).await.map_err(|_| {
252 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
253 })?;
254 }
255 }
256 drop(tx);
257
258 let result = match response.message().await.map_err(status_to_protocol_error)? {
259 Some(PushMessage {
260 body: Some(push_message::Body::Complete(complete)),
261 }) => PushComplete {
262 success: complete.success,
263 new_state: if complete.new_state.is_empty() {
264 None
265 } else {
266 Some(
267 ChangeId::parse(&complete.new_state)
268 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
269 )
270 },
271 error: (!complete.error.is_empty()).then_some(complete.error),
272 transfer_id: complete
273 .transfer
274 .as_ref()
275 .map(|transfer| transfer.transfer_id.clone())
276 .unwrap_or_default(),
277 transport_mode: complete
278 .transfer
279 .as_ref()
280 .map(|transfer| transport_mode_name(transfer.transport_mode))
281 .unwrap_or("raw-objects")
282 .to_string(),
283 resume_offset: complete
284 .transfer
285 .as_ref()
286 .map(|transfer| transfer.resume_offset)
287 .unwrap_or_default(),
288 chunk_index: complete
289 .transfer
290 .as_ref()
291 .map(|transfer| transfer.chunk_index)
292 .unwrap_or_default(),
293 checkpoint: complete
294 .transfer
295 .as_ref()
296 .map(|transfer| transfer.checkpoint.clone())
297 .unwrap_or_default(),
298 is_complete: complete
299 .transfer
300 .as_ref()
301 .map(|transfer| transfer.is_complete)
302 .unwrap_or(false),
303 },
304 _ => {
305 return Err(ProtocolError::InvalidState(
306 "expected PushComplete from gRPC server".to_string(),
307 ));
308 }
309 };
310
311 if result.success {
312 self.sync_remote_markers(repo, repo_path, local_state)
313 .await?;
314 }
315 Ok(result)
316 }
317
318 pub async fn pull(
319 &mut self,
320 repo: &Repository,
321 repo_path: &str,
322 remote_thread: &str,
323 local_thread: Option<&str>,
324 ) -> Result<PullComplete, ProtocolError> {
325 self.pull_with_options(
326 repo,
327 repo_path,
328 remote_thread,
329 PullOptions {
330 local_thread,
331 depth: None,
332 target_state: None,
333 materialization: PullMaterialization::Full,
334 },
335 )
336 .await
337 }
338
339 pub async fn pull_profiled(
340 &mut self,
341 repo: &Repository,
342 repo_path: &str,
343 remote_thread: &str,
344 local_thread: Option<&str>,
345 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
346 self.pull_exchange(
347 repo,
348 repo_path,
349 remote_thread,
350 PullOptions {
351 local_thread,
352 depth: None,
353 target_state: None,
354 materialization: PullMaterialization::Full,
355 },
356 )
357 .await
358 .map(|exchange| (exchange.result, exchange.profile))
359 }
360
361 pub async fn pull_partial(
362 &mut self,
363 repo: &Repository,
364 repo_path: &str,
365 remote_thread: &str,
366 local_thread: Option<&str>,
367 ) -> Result<PullComplete, ProtocolError> {
368 self.pull_with_options(
369 repo,
370 repo_path,
371 remote_thread,
372 PullOptions {
373 local_thread,
374 depth: None,
375 target_state: None,
376 materialization: PullMaterialization::Lazy,
377 },
378 )
379 .await
380 }
381
382 pub async fn pull_with_depth_and_materialization(
383 &mut self,
384 repo: &Repository,
385 repo_path: &str,
386 remote_thread: &str,
387 local_thread: Option<&str>,
388 depth: Option<u32>,
389 materialization: PullMaterialization,
390 ) -> Result<PullComplete, ProtocolError> {
391 self.pull_with_options(
392 repo,
393 repo_path,
394 remote_thread,
395 PullOptions {
396 local_thread,
397 depth,
398 target_state: None,
399 materialization,
400 },
401 )
402 .await
403 }
404
405 pub async fn pull_with_depth(
406 &mut self,
407 repo: &Repository,
408 repo_path: &str,
409 remote_thread: &str,
410 local_thread: Option<&str>,
411 depth: Option<u32>,
412 ) -> Result<PullComplete, ProtocolError> {
413 self.pull_with_depth_and_materialization(
414 repo,
415 repo_path,
416 remote_thread,
417 local_thread,
418 depth,
419 PullMaterialization::Full,
420 )
421 .await
422 }
423
424 pub async fn fetch_state(
425 &mut self,
426 repo: &Repository,
427 repo_path: &str,
428 remote_thread: &str,
429 target_state: ChangeId,
430 ) -> Result<usize, ProtocolError> {
431 self.pull_exchange(
432 repo,
433 repo_path,
434 remote_thread,
435 PullOptions {
436 local_thread: None,
437 depth: None,
438 target_state: Some(target_state),
439 materialization: PullMaterialization::Full,
440 },
441 )
442 .await
443 .map(|exchange| exchange.object_count)
444 }
445
446 pub async fn fetch_state_partial(
447 &mut self,
448 repo: &Repository,
449 repo_path: &str,
450 remote_thread: &str,
451 target_state: ChangeId,
452 ) -> Result<usize, ProtocolError> {
453 self.pull_exchange(
454 repo,
455 repo_path,
456 remote_thread,
457 PullOptions {
458 local_thread: None,
459 depth: None,
460 target_state: Some(target_state),
461 materialization: PullMaterialization::Lazy,
462 },
463 )
464 .await
465 .map(|exchange| exchange.object_count)
466 }
467
468 pub async fn hydrate_blob_at_path(
469 &mut self,
470 repo: &Repository,
471 repo_path: &str,
472 reference: &str,
473 path: &str,
474 ) -> Result<objects::object::Blob, ProtocolError> {
475 let mut request = Request::new(GetBlobRequest {
476 repo_path: repo_path.to_string(),
477 r#ref: reference.to_string(),
478 path: path.to_string(),
479 });
480 self.apply_auth(&mut request)?;
481 let response = self
482 .content
483 .get_blob(request)
484 .await
485 .map_err(status_to_protocol_error)?
486 .into_inner();
487
488 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
489 let blob = objects::object::Blob::new(content);
490 repo.store().put_blob(&blob)?;
491 repo.clear_missing_blob(&blob.hash())?;
492 Ok(blob)
493 }
494
495 pub async fn hydrate_missing_blobs_for_state(
496 &mut self,
497 repo: &Repository,
498 repo_path: &str,
499 remote_thread: &str,
500 target_state: ChangeId,
501 ) -> Result<usize, ProtocolError> {
502 let exchange = self
503 .pull_exchange(
504 repo,
505 repo_path,
506 remote_thread,
507 PullOptions {
508 local_thread: None,
509 depth: None,
510 target_state: Some(target_state),
511 materialization: PullMaterialization::Full,
512 },
513 )
514 .await?;
515 clear_missing_blobs_for_state(repo, target_state)?;
516 Ok(exchange.object_count)
517 }
518
519 async fn pull_with_options(
520 &mut self,
521 repo: &Repository,
522 repo_path: &str,
523 remote_thread: &str,
524 options: PullOptions<'_>,
525 ) -> Result<PullComplete, ProtocolError> {
526 self.pull_exchange(repo, repo_path, remote_thread, options)
527 .await
528 .map(|exchange| exchange.result)
529 }
530
531 async fn pull_exchange(
532 &mut self,
533 repo: &Repository,
534 repo_path: &str,
535 remote_thread: &str,
536 options: PullOptions<'_>,
537 ) -> Result<PullExchange, ProtocolError> {
538 let exchange_start = Instant::now();
539 let mut exclude_states = Vec::new();
540 if let Some(local_thread) = options.local_thread
541 && let Some(head) = repo.refs().get_thread(local_thread)?
542 {
543 exclude_states.push(head);
544 }
545 let allow_partial_fetch = options.materialization.allows_partial_fetch();
546 let fresh_full_pull =
547 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
548
549 let transfer_id = pull_transfer_id(
550 repo_path,
551 remote_thread,
552 options.local_thread,
553 options.depth,
554 options.target_state,
555 );
556 let request_message = PullMessage {
557 body: Some(pull_message::Body::Request(PullRequest {
558 repo_path: repo_path.to_string(),
559 remote_thread: remote_thread.to_string(),
560 local_thread: options.local_thread.unwrap_or_default().to_string(),
561 target_state: options
562 .target_state
563 .map(|value| value.to_string_full())
564 .unwrap_or_default(),
565 depth: options.depth.unwrap_or_default(),
566 exclude_states: exclude_states
567 .iter()
568 .map(ChangeId::to_string_full)
569 .collect(),
570 transfer: Some(self.transport.transfer_checkpoint_with_mode(
571 transfer_id.clone(),
572 TransportMode::NativePack,
573 0,
574 0,
575 false,
576 )),
577 partial_fetch_status: partial_fetch_status_for_repo(repo),
578 allow_partial_fetch,
579 fresh_full_pull,
580 client_operation_id: String::new(),
581 })),
582 };
583
584 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
585 tx.send(request_message).await.map_err(|_| {
586 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
587 })?;
588 let mut request = Request::new(ReceiverStream::new(rx));
589 self.apply_auth(&mut request)?;
590 let mut response = self
591 .inner
592 .pull(request)
593 .await
594 .map_err(status_to_protocol_error)?
595 .into_inner();
596
597 let ready = match response.message().await.map_err(status_to_protocol_error)? {
598 Some(PullMessage {
599 body: Some(pull_message::Body::Ready(ready)),
600 }) => ready,
601 _ => {
602 return Err(ProtocolError::InvalidState(
603 "expected PullReady from gRPC server".to_string(),
604 ));
605 }
606 };
607 let mut profile = PullProfile {
608 ready_wait: exchange_start.elapsed(),
609 ..PullProfile::default()
610 };
611 let remote_state = ChangeId::parse(&ready.remote_state)
612 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
613 let PullWantPlan {
614 wants,
615 wanted_types,
616 want_full_closure,
617 } = plan_pull_wants(
618 repo,
619 &remote_state,
620 ready.full_closure_available,
621 ready.objects_to_fetch,
622 allow_partial_fetch,
623 )?;
624
625 tx.send(PullMessage {
626 body: Some(pull_message::Body::Want(WantObjects {
627 objects: wants.clone(),
628 want_full_closure,
629 transfer: Some(self.transport.transfer_checkpoint_with_mode(
630 transfer_id.clone(),
631 TransportMode::NativePack,
632 0,
633 0,
634 false,
635 )),
636 })),
637 })
638 .await
639 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
640 drop(tx);
641
642 let receive_start = Instant::now();
643 let mut pack_state = proto::PackChunkState::default();
644 let mut received = 0usize;
645 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
646 match message.body {
647 Some(pull_message::Body::Pack(chunk)) => {
648 profile.bytes_received =
649 profile.bytes_received.saturating_add(chunk.data.len());
650 profile.pack_bytes_received =
651 profile.pack_bytes_received.saturating_add(chunk.data.len());
652 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
653 ProtocolError::InvalidState(
654 "native pack chunk missing transfer checkpoint".to_string(),
655 )
656 })?;
657 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
658 .unwrap_or(PackStreamKind::Unspecified);
659 if stream_kind == PackStreamKind::Unspecified {
660 return Err(ProtocolError::InvalidState(
661 "native pack chunk missing stream kind".to_string(),
662 ));
663 }
664 let decode_start = Instant::now();
665 proto::receive_pack_chunk(
666 &mut pack_state,
667 stream_kind == PackStreamKind::Index,
668 transfer.resume_offset,
669 transfer.chunk_index,
670 transfer.is_complete,
671 &chunk.data,
672 chunk.is_final_chunk,
673 )?;
674 let decode_elapsed = decode_start.elapsed();
675 profile.pack_decode += decode_elapsed;
676 profile.pack_decode_apply += decode_elapsed;
677 profile.decode += decode_elapsed;
678 }
679 Some(pull_message::Body::Complete(complete)) => {
680 profile.receive_and_apply = receive_start.elapsed();
681 let final_state = if complete.new_state.is_empty() {
682 None
683 } else {
684 Some(
685 ChangeId::parse(&complete.new_state)
686 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
687 )
688 };
689
690 if complete.success {
691 if want_full_closure || !wants.is_empty() {
692 if !pack_state.is_complete() {
693 return Err(ProtocolError::InvalidState(
694 "pull completed before native pack stream finished".to_string(),
695 ));
696 }
697 let store_start = Instant::now();
698 let installed_ids = proto::install_received_pack(
699 repo.store(),
700 &pack_state.pack_data,
701 &pack_state.index_data,
702 )?;
703 profile.store_receive_object += store_start.elapsed();
704 received = installed_ids.len();
705 for id in installed_ids {
706 match (id, wanted_types.get(&id).copied()) {
707 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
708 profile.object_mix.record(ObjectType::Blob);
709 repo.clear_missing_blob(&hash)?;
710 }
711 (_, Some(obj_type)) => {
712 profile.object_mix.record(obj_type);
713 }
714 (PackObjectId::ChangeId(_), None) => {
715 profile.object_mix.record(ObjectType::State);
716 }
717 (PackObjectId::Hash(hash), None) => {
718 let inferred =
719 infer_installed_hash_object_type(repo, &hash)?;
720 profile.object_mix.record(inferred);
721 }
722 }
723 }
724 }
725
726 let metadata_start = Instant::now();
727 if let Some(local_thread) = options.local_thread
728 && let Some(state) = final_state
729 {
730 repo.refs().set_thread(local_thread, &state)?;
731 }
732 if let Some(state) = final_state
733 && allow_partial_fetch
734 {
735 mark_missing_blobs_for_state(repo, state)?;
736 } else if final_state.is_some() {
737 let _ = repo.clear_all_missing_blobs()?;
738 }
739 let synced_markers = complete
740 .transfer
741 .as_ref()
742 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
743 .transpose()?
744 .unwrap_or(false);
745 if !synced_markers {
746 self.sync_local_markers(repo, repo_path).await?;
747 }
748 profile.metadata_sync = metadata_start.elapsed();
749 profile.objects_received = received;
750 return Ok(PullExchange {
751 result: PullComplete {
752 success: true,
753 final_state,
754 error: None,
755 transfer_id: complete
756 .transfer
757 .as_ref()
758 .map(|transfer| transfer.transfer_id.clone())
759 .unwrap_or_default(),
760 transport_mode: complete
761 .transfer
762 .as_ref()
763 .map(|transfer| transport_mode_name(transfer.transport_mode))
764 .unwrap_or("native-pack")
765 .to_string(),
766 resume_offset: complete
767 .transfer
768 .as_ref()
769 .map(|transfer| transfer.resume_offset)
770 .unwrap_or_default(),
771 chunk_index: complete
772 .transfer
773 .as_ref()
774 .map(|transfer| transfer.chunk_index)
775 .unwrap_or_default(),
776 checkpoint: complete
777 .transfer
778 .as_ref()
779 .map(|transfer| transfer.checkpoint.clone())
780 .unwrap_or_default(),
781 is_complete: complete
782 .transfer
783 .as_ref()
784 .map(|transfer| transfer.is_complete)
785 .unwrap_or(false),
786 },
787 object_count: received,
788 profile,
789 });
790 }
791
792 profile.objects_received = received;
793 return Ok(PullExchange {
794 result: PullComplete {
795 success: false,
796 final_state,
797 error: (!complete.error.is_empty()).then_some(complete.error),
798 transfer_id: complete
799 .transfer
800 .as_ref()
801 .map(|transfer| transfer.transfer_id.clone())
802 .unwrap_or_default(),
803 transport_mode: complete
804 .transfer
805 .as_ref()
806 .map(|transfer| transport_mode_name(transfer.transport_mode))
807 .unwrap_or("native-pack")
808 .to_string(),
809 resume_offset: complete
810 .transfer
811 .as_ref()
812 .map(|transfer| transfer.resume_offset)
813 .unwrap_or_default(),
814 chunk_index: complete
815 .transfer
816 .as_ref()
817 .map(|transfer| transfer.chunk_index)
818 .unwrap_or_default(),
819 checkpoint: complete
820 .transfer
821 .as_ref()
822 .map(|transfer| transfer.checkpoint.clone())
823 .unwrap_or_default(),
824 is_complete: complete
825 .transfer
826 .as_ref()
827 .map(|transfer| transfer.is_complete)
828 .unwrap_or(false),
829 },
830 object_count: received,
831 profile,
832 });
833 }
834 _ => {}
835 }
836 }
837
838 Err(ProtocolError::InvalidState(format!(
839 "pull stream ended unexpectedly after receiving {received} packed objects"
840 )))
841 }
842}
843
844fn load_thread_metadata(
845 repo: &Repository,
846 target_thread: &str,
847 local_state: ChangeId,
848) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
849 let thread_manager = ThreadManager::new(repo.heddle_dir());
850 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
851}
852
853fn plan_pull_wants(
854 repo: &Repository,
855 remote_state: &ChangeId,
856 full_closure_available: bool,
857 objects_to_fetch: Vec<ObjectDescriptor>,
858 allow_partial_fetch: bool,
859) -> Result<PullWantPlan, ProtocolError> {
860 if full_closure_available {
861 return Ok(PullWantPlan {
862 wants: Vec::new(),
863 wanted_types: HashMap::new(),
864 want_full_closure: true,
865 });
866 }
867 let request_full_closure =
868 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
869 let mut wants = Vec::with_capacity(objects_to_fetch.len());
870 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
871
872 for descriptor in objects_to_fetch {
873 let info = parse_descriptor_to_info(descriptor)?;
874 let pack_id = match &info.id {
875 proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
876 proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
877 };
878 let include = if request_full_closure {
879 true
880 } else {
881 let has = proto::has_object(repo.store(), &info)?;
882 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
883 };
884
885 if include {
886 wanted_types.insert(pack_id, info.obj_type);
887 wants.push(object_descriptor_with_status(
888 &info,
889 ObjectAvailabilityStatus::Missing,
890 "requested by client",
891 ));
892 }
893 }
894
895 Ok(PullWantPlan {
896 wants,
897 wanted_types,
898 want_full_closure: false,
899 })
900}
901
902fn supports_compact_full_pull(
903 repo: &Repository,
904 allow_partial_fetch: bool,
905 exclude_states: &[ChangeId],
906) -> Result<bool, ProtocolError> {
907 if allow_partial_fetch || !exclude_states.is_empty() {
908 return Ok(false);
909 }
910 repo_looks_fresh(repo)
911}
912
913fn should_request_full_closure(
914 repo: &Repository,
915 remote_state: &ChangeId,
916 allow_partial_fetch: bool,
917) -> Result<bool, ProtocolError> {
918 if allow_partial_fetch || repo.store().has_state(remote_state)? {
919 return Ok(false);
920 }
921 repo_looks_fresh(repo)
922}
923
924fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
925 if repo.head()?.is_some() {
926 return Ok(false);
927 }
928 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
929 return Ok(false);
930 }
931 Ok(repo.missing_blobs()?.is_empty())
932}
933
934fn infer_installed_hash_object_type(
935 repo: &Repository,
936 hash: &ContentHash,
937) -> Result<ObjectType, ProtocolError> {
938 let store = repo.store();
939 if store.get_tree(hash)?.is_some() {
940 return Ok(ObjectType::Tree);
941 }
942 if store
943 .get_action(&objects::object::ActionId::from_hash(*hash))?
944 .is_some()
945 {
946 return Ok(ObjectType::Action);
947 }
948 Ok(ObjectType::Blob)
949}
950
951fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
952 const HEADER: &str = "heddle-markers-v1\n";
953 if checkpoint.is_empty() {
954 return Ok(false);
955 }
956 let payload = std::str::from_utf8(checkpoint)
957 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
958 let Some(lines) = payload.strip_prefix(HEADER) else {
959 return Ok(false);
960 };
961
962 for line in lines.lines() {
963 if line.is_empty() {
964 continue;
965 }
966 let Some((name, change_id)) = line.split_once('\t') else {
967 return Err(ProtocolError::InvalidState(
968 "invalid marker snapshot line".to_string(),
969 ));
970 };
971 let change_id = ChangeId::parse(change_id)
972 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
973 if !repo.store().has_state(&change_id)? {
974 continue;
975 }
976 match repo.refs().get_marker(name)? {
977 Some(existing) if existing == change_id => {}
978 Some(existing) => repo.refs().set_marker_cas(
979 name,
980 refs::RefExpectation::Value(existing),
981 &change_id,
982 )?,
983 None => repo.refs().create_marker(name, &change_id)?,
984 }
985 }
986
987 Ok(true)
988}
989
990fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
991 if s.is_empty() {
992 return Vec::new();
993 }
994 objects::object::ChangeId::parse(s)
995 .map(|id| id.as_bytes().to_vec())
996 .unwrap_or_default()
997}
998
999fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1000 ThreadMetadata {
1001 name: metadata.thread.clone(),
1002 target_thread: metadata.target_thread.clone(),
1003 parent_thread: metadata.parent_thread.clone(),
1004 task: metadata.task.clone(),
1005 thread_mode: metadata.mode.to_string(),
1006 thread_state: metadata.state.to_string(),
1007 freshness: metadata.freshness.to_string(),
1008 base_state: change_id_string_to_bytes(&metadata.base_state),
1009 base_root: change_id_string_to_bytes(&metadata.base_root),
1010 current_state: metadata
1011 .current_state
1012 .as_deref()
1013 .map(change_id_string_to_bytes),
1014 merged_state: metadata
1015 .merged_state
1016 .as_deref()
1017 .map(change_id_string_to_bytes),
1018 changed_paths: metadata.changed_paths.clone(),
1019 impact_categories: metadata
1020 .impact_categories
1021 .iter()
1022 .map(ToString::to_string)
1023 .collect(),
1024 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1025 promotion_suggested: metadata.promotion_suggested,
1026 verification_summary: Some(ThreadVerificationSummary {
1027 tests_passed: metadata.verification_summary.tests_passed,
1028 tests_failed: metadata
1029 .verification_summary
1030 .tests_failed
1031 .unwrap_or_default(),
1032 coverage_pct: metadata.verification_summary.coverage_pct,
1033 lint_warnings: metadata.verification_summary.lint_warnings,
1034 }),
1035 confidence_summary: Some(ThreadConfidenceSummary {
1036 value: metadata.confidence_summary.value,
1037 band: metadata
1038 .confidence_summary
1039 .band
1040 .as_ref()
1041 .map(ToString::to_string),
1042 }),
1043 integration_policy_result: Some(ThreadIntegrationPolicy {
1044 status: metadata
1045 .integration_policy_result
1046 .status
1047 .clone()
1048 .unwrap_or_default(),
1049 reason: metadata
1050 .integration_policy_result
1051 .reason
1052 .clone()
1053 .unwrap_or_default(),
1054 }),
1055 created_at: Some(prost_types::Timestamp {
1056 seconds: metadata.created_at.timestamp(),
1057 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1058 }),
1059 updated_at: Some(prost_types::Timestamp {
1060 seconds: metadata.updated_at.timestamp(),
1061 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1062 }),
1063 }
1064}
1065
1066struct PullExchange {
1067 result: PullComplete,
1068 object_count: usize,
1069 profile: PullProfile,
1070}
1071
1072fn mark_missing_blobs_for_state(
1073 repo: &Repository,
1074 state_id: ChangeId,
1075) -> Result<(), ProtocolError> {
1076 let state = repo
1077 .store()
1078 .get_state(&state_id)?
1079 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1080 let mut missing = collect_missing_blobs(repo, &state.tree);
1081 if let Some(context_root) = state.context.as_ref() {
1082 missing.extend(collect_missing_blobs(repo, context_root));
1083 }
1084 missing
1085 .into_iter()
1086 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1087}
1088
1089fn clear_missing_blobs_for_state(
1090 repo: &Repository,
1091 state_id: ChangeId,
1092) -> Result<(), ProtocolError> {
1093 let state = repo
1094 .store()
1095 .get_state(&state_id)?
1096 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1097 let mut missing = collect_missing_blobs(repo, &state.tree);
1098 if let Some(context_root) = state.context.as_ref() {
1099 missing.extend(collect_missing_blobs(repo, context_root));
1100 }
1101 missing
1102 .into_iter()
1103 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1104}
1105
1106fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1107 let mut missing = Vec::new();
1108 collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1109 missing
1110}
1111
1112fn collect_missing_blobs_recursive(
1113 repo: &Repository,
1114 tree_hash: &ContentHash,
1115 missing: &mut Vec<ContentHash>,
1116) {
1117 let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1118 return;
1119 };
1120 for entry in tree.entries() {
1121 match entry.entry_type {
1122 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1123 if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1124 missing.push(entry.hash);
1125 }
1126 }
1127 objects::object::EntryType::Tree => {
1128 collect_missing_blobs_recursive(repo, &entry.hash, missing);
1129 }
1130 }
1131 }
1132}
1133
1134fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1135 match repo.missing_blobs() {
1136 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1137 Ok(_) => PartialFetchStatus::Disabled as i32,
1138 Err(_) => PartialFetchStatus::Unspecified as i32,
1139 }
1140}
1141
1142fn pull_transfer_id(
1143 repo_path: &str,
1144 remote_thread: &str,
1145 local_thread: Option<&str>,
1146 depth: Option<u32>,
1147 target_state: Option<ChangeId>,
1148) -> String {
1149 format!(
1150 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1151 local_thread.unwrap_or_default(),
1152 target_state
1153 .map(|value| value.to_string_full())
1154 .unwrap_or_default()
1155 )
1156}
1157
1158fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1159 format!(
1160 "push:{repo_path}:{}:{target_thread}",
1161 local_state.to_string_full()
1162 )
1163}
1164
1165fn encode_native_pack_messages(
1166 bundle: &proto::NativePackBundle,
1167 transfer_id: &str,
1168 chunk_size: usize,
1169 transport: &super::helpers::HostedTransportPolicy,
1170 transport_mode: TransportMode,
1171) -> Result<Vec<PushMessage>, ProtocolError> {
1172 let mut messages = Vec::new();
1173 let chunk_size = chunk_size.max(1);
1174
1175 let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1176 for chunk_index in 0..pack_total_chunks.max(1) {
1177 let Some((start, len)) =
1178 proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1179 else {
1180 break;
1181 };
1182 messages.push(PushMessage {
1183 body: Some(push_message::Body::Pack(PackChunk {
1184 stream_kind: PackStreamKind::Pack as i32,
1185 data: bundle.pack_data[start..start + len].to_vec(),
1186 transfer: Some(transport.transfer_checkpoint_with_mode(
1187 transfer_id,
1188 transport_mode,
1189 chunk_index as u32,
1190 start as u64,
1191 chunk_index + 1 == pack_total_chunks,
1192 )),
1193 chunk_length: len as u32,
1194 is_final_chunk: chunk_index + 1 == pack_total_chunks,
1195 })),
1196 });
1197 }
1198
1199 let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1200 for chunk_index in 0..index_total_chunks.max(1) {
1201 let Some((start, len)) =
1202 proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1203 else {
1204 break;
1205 };
1206 messages.push(PushMessage {
1207 body: Some(push_message::Body::Pack(PackChunk {
1208 stream_kind: PackStreamKind::Index as i32,
1209 data: bundle.index_data[start..start + len].to_vec(),
1210 transfer: Some(transport.transfer_checkpoint_with_mode(
1211 transfer_id,
1212 transport_mode,
1213 chunk_index as u32,
1214 start as u64,
1215 chunk_index + 1 == index_total_chunks,
1216 )),
1217 chunk_length: len as u32,
1218 is_final_chunk: chunk_index + 1 == index_total_chunks,
1219 })),
1220 });
1221 }
1222 Ok(messages)
1223}
1224
1225fn preferred_transport_mode(
1226 transport: &super::helpers::HostedTransportPolicy,
1227 object_count: usize,
1228) -> TransportMode {
1229 let _ = transport;
1230 let _ = object_count;
1231 TransportMode::NativePack
1232}