1use std::{
2 collections::HashMap,
3 time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7 GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8 PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9 RedactionTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata,
10 ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects, pull_message,
11 push_message,
12};
13use objects::{
14 object::{ChangeId, ContentHash},
15 store::PackObjectId,
16};
17use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
18use repo::{Repository, SyncedThreadMetadata, ThreadManager};
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::Request;
22
23use super::{
24 HostedGrpcClient, PullMaterialization,
25 helpers::{
26 descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
27 status_to_protocol_error, to_proto_object_info, transport_mode_name,
28 },
29};
30
31#[derive(Clone, Copy)]
32struct PullOptions<'a> {
33 local_thread: Option<&'a str>,
34 depth: Option<u32>,
35 target_state: Option<ChangeId>,
36 materialization: PullMaterialization,
37}
38
39struct PullWantPlan {
40 wants: Vec<ObjectDescriptor>,
41 wanted_types: HashMap<PackObjectId, ObjectType>,
42 want_full_closure: bool,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct PullObjectMix {
47 pub blobs: usize,
48 pub trees: usize,
49 pub states: usize,
50 pub actions: usize,
51 pub redactions: usize,
52}
53
54impl PullObjectMix {
55 fn record(&mut self, obj_type: ObjectType) {
56 match obj_type {
57 ObjectType::Blob => self.blobs += 1,
58 ObjectType::Tree => self.trees += 1,
59 ObjectType::State => self.states += 1,
60 ObjectType::Action => self.actions += 1,
61 ObjectType::Redaction => self.redactions += 1,
62 }
63 }
64
65 pub fn total(&self) -> usize {
66 self.blobs + self.trees + self.states + self.actions + self.redactions
67 }
68}
69
70#[derive(Debug, Clone, Default)]
71pub struct PullProfile {
72 pub ready_wait: Duration,
73 pub receive_and_apply: Duration,
74 pub decode: Duration,
75 pub store_receive_object: Duration,
76 pub metadata_sync: Duration,
77 pub pack_decode_apply: Duration,
78 pub raw_decode_apply: Duration,
79 pub pack_decode: Duration,
80 pub raw_decode: Duration,
81 pub bytes_received: usize,
82 pub pack_bytes_received: usize,
83 pub raw_bytes_received: usize,
84 pub objects_received: usize,
85 pub object_mix: PullObjectMix,
86}
87
88impl HostedGrpcClient {
89 pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
90 let mut request = Request::new(ListRefsRequest {
91 repo_path: repo_path.to_string(),
92 });
93 self.apply_auth(&mut request)?;
94 let response = self
95 .inner
96 .list_refs(request)
97 .await
98 .map_err(status_to_protocol_error)?
99 .into_inner();
100 response
101 .refs
102 .into_iter()
103 .map(|entry| {
104 Ok(RefEntry {
105 name: entry.name,
106 change_id: ChangeId::try_from_slice(&entry.change_id)
107 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
108 is_thread: entry.is_thread,
109 })
110 })
111 .collect()
112 }
113
114 #[allow(clippy::too_many_arguments)]
115 pub async fn update_ref(
116 &mut self,
117 repo_path: &str,
118 name: &str,
119 is_thread: bool,
120 old_value: Option<ChangeId>,
121 new_value: ChangeId,
122 force: bool,
123 thread_metadata: Option<&SyncedThreadMetadata>,
124 ) -> Result<RefUpdated, ProtocolError> {
125 let mut request = Request::new(UpdateRefRequest {
126 repo_path: repo_path.to_string(),
127 name: name.to_string(),
128 is_thread,
129 force,
130 old_value: old_value
131 .map(|value| value.to_string_full())
132 .unwrap_or_default(),
133 new_value: new_value.to_string_full(),
134 thread_metadata: thread_metadata.map(to_proto_thread_metadata),
135 client_operation_id: String::new(),
136 });
137 self.apply_auth(&mut request)?;
138 let response = self
139 .inner
140 .update_ref(request)
141 .await
142 .map_err(status_to_protocol_error)?
143 .into_inner();
144 Ok(RefUpdated {
145 success: response.success,
146 old_value: if response.old_value.is_empty() {
147 None
148 } else {
149 Some(
150 ChangeId::parse(&response.old_value)
151 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
152 )
153 },
154 error: (!response.error.is_empty()).then_some(response.error),
155 })
156 }
157
158 pub async fn push(
159 &mut self,
160 repo: &Repository,
161 repo_path: &str,
162 local_state: ChangeId,
163 target_thread: &str,
164 force: bool,
165 ) -> Result<PushComplete, ProtocolError> {
166 let _ = self.transport.chunk_size;
167 let _ = self.transport.resume_attempts;
168 let _ = self.transport.negotiated.chunk_size();
169 let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
170 let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
171 let transport_mode = preferred_transport_mode(&self.transport, objects.len());
172 let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
173 let request_message = PushMessage {
174 body: Some(push_message::Body::Request(PushRequest {
175 repo_path: repo_path.to_string(),
176 local_state: local_state.to_string_full(),
177 target_thread: target_thread.to_string(),
178 create_thread: true,
179 force,
180 objects: objects.iter().map(to_proto_object_info).collect(),
181 transfer: Some(self.transport.transfer_checkpoint_with_mode(
182 transfer_id.clone(),
183 transport_mode,
184 0,
185 0,
186 false,
187 )),
188 partial_fetch_status: partial_fetch_status_for_repo(repo),
189 allow_partial_fetch: true,
190 thread_metadata: thread_metadata
191 .map(|metadata| to_proto_thread_metadata(&metadata)),
192 client_operation_id: String::new(),
193 })),
194 };
195
196 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
197 tx.send(request_message).await.map_err(|_| {
198 ProtocolError::InvalidState("failed to initialize push stream".to_string())
199 })?;
200 let mut request = Request::new(ReceiverStream::new(rx));
201 self.apply_auth(&mut request)?;
202 let mut response = self
203 .inner
204 .push(request)
205 .await
206 .map_err(status_to_protocol_error)?
207 .into_inner();
208
209 let ready = match response.message().await.map_err(status_to_protocol_error)? {
210 Some(PushMessage {
211 body: Some(push_message::Body::Ready(ready)),
212 }) => ready,
213 _ => {
214 return Err(ProtocolError::InvalidState(
215 "expected PushReady from gRPC server".to_string(),
216 ));
217 }
218 };
219
220 let object_index = objects
221 .into_iter()
222 .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
223 .collect::<HashMap<_, _>>();
224
225 let ready_transport_mode = ready
226 .transfer
227 .as_ref()
228 .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
229 .unwrap_or(transport_mode);
230 let wanted_infos = ready
231 .want_objects
232 .into_iter()
233 .map(|want| {
234 object_index
235 .get(&descriptor_id(&want))
236 .cloned()
237 .ok_or_else(|| {
238 ProtocolError::InvalidState("server requested unknown object".to_string())
239 })
240 })
241 .collect::<Result<Vec<_>, _>>()?;
242
243 let (wanted_redactions, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
249 .into_iter()
250 .partition(|info| info.obj_type == proto::ObjectType::Redaction);
251
252 if !wanted_packable.is_empty() {
253 let bundle = proto::build_native_pack(repo.store(), &wanted_packable)?;
254 for message in encode_native_pack_messages(
255 &bundle,
256 &transfer_id,
257 self.transport.chunk_size.max(1),
258 &self.transport,
259 ready_transport_mode,
260 )? {
261 tx.send(message).await.map_err(|_| {
262 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
263 })?;
264 }
265 }
266
267 for info in wanted_redactions {
268 let proto::ObjectId::Hash(blob) = info.id else {
269 return Err(ProtocolError::InvalidState(
270 "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
271 ));
272 };
273 let bytes = repo
278 .store()
279 .get_redactions_bytes_for_blob(&blob)
280 .map_err(|err| {
281 ProtocolError::InvalidState(format!(
282 "load redactions sidecar for {}: {err}",
283 blob.to_hex()
284 ))
285 })?
286 .ok_or_else(|| {
287 ProtocolError::InvalidState(format!(
288 "server wants redaction for blob {} but sender has no sidecar",
289 blob.to_hex()
290 ))
291 })?;
292 let message = PushMessage {
293 body: Some(push_message::Body::Redaction(RedactionTransfer {
294 blob_hash: blob.to_hex(),
295 redactions_blob: bytes,
296 })),
297 };
298 tx.send(message).await.map_err(|_| {
299 ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
300 })?;
301 }
302 drop(tx);
303
304 let result = match response.message().await.map_err(status_to_protocol_error)? {
305 Some(PushMessage {
306 body: Some(push_message::Body::Complete(complete)),
307 }) => PushComplete {
308 success: complete.success,
309 new_state: if complete.new_state.is_empty() {
310 None
311 } else {
312 Some(
313 ChangeId::parse(&complete.new_state)
314 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
315 )
316 },
317 error: (!complete.error.is_empty()).then_some(complete.error),
318 transfer_id: complete
319 .transfer
320 .as_ref()
321 .map(|transfer| transfer.transfer_id.clone())
322 .unwrap_or_default(),
323 transport_mode: complete
324 .transfer
325 .as_ref()
326 .map(|transfer| transport_mode_name(transfer.transport_mode))
327 .unwrap_or("raw-objects")
328 .to_string(),
329 resume_offset: complete
330 .transfer
331 .as_ref()
332 .map(|transfer| transfer.resume_offset)
333 .unwrap_or_default(),
334 chunk_index: complete
335 .transfer
336 .as_ref()
337 .map(|transfer| transfer.chunk_index)
338 .unwrap_or_default(),
339 checkpoint: complete
340 .transfer
341 .as_ref()
342 .map(|transfer| transfer.checkpoint.clone())
343 .unwrap_or_default(),
344 is_complete: complete
345 .transfer
346 .as_ref()
347 .map(|transfer| transfer.is_complete)
348 .unwrap_or(false),
349 },
350 _ => {
351 return Err(ProtocolError::InvalidState(
352 "expected PushComplete from gRPC server".to_string(),
353 ));
354 }
355 };
356
357 if result.success {
358 self.sync_remote_markers(repo, repo_path, local_state)
359 .await?;
360 }
361 Ok(result)
362 }
363
364 pub async fn pull(
365 &mut self,
366 repo: &Repository,
367 repo_path: &str,
368 remote_thread: &str,
369 local_thread: Option<&str>,
370 ) -> Result<PullComplete, ProtocolError> {
371 self.pull_with_options(
372 repo,
373 repo_path,
374 remote_thread,
375 PullOptions {
376 local_thread,
377 depth: None,
378 target_state: None,
379 materialization: PullMaterialization::Full,
380 },
381 )
382 .await
383 }
384
385 pub async fn pull_profiled(
386 &mut self,
387 repo: &Repository,
388 repo_path: &str,
389 remote_thread: &str,
390 local_thread: Option<&str>,
391 ) -> Result<(PullComplete, PullProfile), ProtocolError> {
392 self.pull_exchange(
393 repo,
394 repo_path,
395 remote_thread,
396 PullOptions {
397 local_thread,
398 depth: None,
399 target_state: None,
400 materialization: PullMaterialization::Full,
401 },
402 )
403 .await
404 .map(|exchange| (exchange.result, exchange.profile))
405 }
406
407 pub async fn pull_partial(
408 &mut self,
409 repo: &Repository,
410 repo_path: &str,
411 remote_thread: &str,
412 local_thread: Option<&str>,
413 ) -> Result<PullComplete, ProtocolError> {
414 self.pull_with_options(
415 repo,
416 repo_path,
417 remote_thread,
418 PullOptions {
419 local_thread,
420 depth: None,
421 target_state: None,
422 materialization: PullMaterialization::Lazy,
423 },
424 )
425 .await
426 }
427
428 pub async fn pull_with_depth_and_materialization(
429 &mut self,
430 repo: &Repository,
431 repo_path: &str,
432 remote_thread: &str,
433 local_thread: Option<&str>,
434 depth: Option<u32>,
435 materialization: PullMaterialization,
436 ) -> Result<PullComplete, ProtocolError> {
437 self.pull_with_options(
438 repo,
439 repo_path,
440 remote_thread,
441 PullOptions {
442 local_thread,
443 depth,
444 target_state: None,
445 materialization,
446 },
447 )
448 .await
449 }
450
451 pub async fn pull_with_depth(
452 &mut self,
453 repo: &Repository,
454 repo_path: &str,
455 remote_thread: &str,
456 local_thread: Option<&str>,
457 depth: Option<u32>,
458 ) -> Result<PullComplete, ProtocolError> {
459 self.pull_with_depth_and_materialization(
460 repo,
461 repo_path,
462 remote_thread,
463 local_thread,
464 depth,
465 PullMaterialization::Full,
466 )
467 .await
468 }
469
470 pub async fn fetch_state(
471 &mut self,
472 repo: &Repository,
473 repo_path: &str,
474 remote_thread: &str,
475 target_state: ChangeId,
476 ) -> Result<usize, ProtocolError> {
477 self.pull_exchange(
478 repo,
479 repo_path,
480 remote_thread,
481 PullOptions {
482 local_thread: None,
483 depth: None,
484 target_state: Some(target_state),
485 materialization: PullMaterialization::Full,
486 },
487 )
488 .await
489 .map(|exchange| exchange.object_count)
490 }
491
492 pub async fn fetch_state_partial(
493 &mut self,
494 repo: &Repository,
495 repo_path: &str,
496 remote_thread: &str,
497 target_state: ChangeId,
498 ) -> Result<usize, ProtocolError> {
499 self.pull_exchange(
500 repo,
501 repo_path,
502 remote_thread,
503 PullOptions {
504 local_thread: None,
505 depth: None,
506 target_state: Some(target_state),
507 materialization: PullMaterialization::Lazy,
508 },
509 )
510 .await
511 .map(|exchange| exchange.object_count)
512 }
513
514 pub async fn hydrate_blob_at_path(
515 &mut self,
516 repo: &Repository,
517 repo_path: &str,
518 reference: &str,
519 path: &str,
520 ) -> Result<objects::object::Blob, ProtocolError> {
521 let mut request = Request::new(GetBlobRequest {
522 repo_path: repo_path.to_string(),
523 r#ref: reference.to_string(),
524 path: path.to_string(),
525 });
526 self.apply_auth(&mut request)?;
527 let response = self
528 .content
529 .get_blob(request)
530 .await
531 .map_err(status_to_protocol_error)?
532 .into_inner();
533
534 let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
535 let blob = objects::object::Blob::new(content);
536 repo.store().put_blob(&blob)?;
537 repo.clear_missing_blob(&blob.hash())?;
538 Ok(blob)
539 }
540
541 pub async fn hydrate_missing_blobs_for_state(
542 &mut self,
543 repo: &Repository,
544 repo_path: &str,
545 remote_thread: &str,
546 target_state: ChangeId,
547 ) -> Result<usize, ProtocolError> {
548 let exchange = self
549 .pull_exchange(
550 repo,
551 repo_path,
552 remote_thread,
553 PullOptions {
554 local_thread: None,
555 depth: None,
556 target_state: Some(target_state),
557 materialization: PullMaterialization::Full,
558 },
559 )
560 .await?;
561 clear_missing_blobs_for_state(repo, target_state)?;
562 Ok(exchange.object_count)
563 }
564
565 async fn pull_with_options(
566 &mut self,
567 repo: &Repository,
568 repo_path: &str,
569 remote_thread: &str,
570 options: PullOptions<'_>,
571 ) -> Result<PullComplete, ProtocolError> {
572 self.pull_exchange(repo, repo_path, remote_thread, options)
573 .await
574 .map(|exchange| exchange.result)
575 }
576
577 async fn pull_exchange(
578 &mut self,
579 repo: &Repository,
580 repo_path: &str,
581 remote_thread: &str,
582 options: PullOptions<'_>,
583 ) -> Result<PullExchange, ProtocolError> {
584 let exchange_start = Instant::now();
585 let mut exclude_states = Vec::new();
586 if let Some(local_thread) = options.local_thread
587 && let Some(head) = repo.refs().get_thread(local_thread)?
588 {
589 exclude_states.push(head);
590 }
591 let allow_partial_fetch = options.materialization.allows_partial_fetch();
592 let fresh_full_pull =
593 supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
594
595 let transfer_id = pull_transfer_id(
596 repo_path,
597 remote_thread,
598 options.local_thread,
599 options.depth,
600 options.target_state,
601 );
602 let request_message = PullMessage {
603 body: Some(pull_message::Body::Request(PullRequest {
604 repo_path: repo_path.to_string(),
605 remote_thread: remote_thread.to_string(),
606 local_thread: options.local_thread.unwrap_or_default().to_string(),
607 target_state: options
608 .target_state
609 .map(|value| value.to_string_full())
610 .unwrap_or_default(),
611 depth: options.depth.unwrap_or_default(),
612 exclude_states: exclude_states
613 .iter()
614 .map(ChangeId::to_string_full)
615 .collect(),
616 transfer: Some(self.transport.transfer_checkpoint_with_mode(
617 transfer_id.clone(),
618 TransportMode::NativePack,
619 0,
620 0,
621 false,
622 )),
623 partial_fetch_status: partial_fetch_status_for_repo(repo),
624 allow_partial_fetch,
625 fresh_full_pull,
626 client_operation_id: String::new(),
627 })),
628 };
629
630 let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
631 tx.send(request_message).await.map_err(|_| {
632 ProtocolError::InvalidState("failed to initialize pull stream".to_string())
633 })?;
634 let mut request = Request::new(ReceiverStream::new(rx));
635 self.apply_auth(&mut request)?;
636 let mut response = self
637 .inner
638 .pull(request)
639 .await
640 .map_err(status_to_protocol_error)?
641 .into_inner();
642
643 let ready = match response.message().await.map_err(status_to_protocol_error)? {
644 Some(PullMessage {
645 body: Some(pull_message::Body::Ready(ready)),
646 }) => ready,
647 _ => {
648 return Err(ProtocolError::InvalidState(
649 "expected PullReady from gRPC server".to_string(),
650 ));
651 }
652 };
653 let mut profile = PullProfile {
654 ready_wait: exchange_start.elapsed(),
655 ..PullProfile::default()
656 };
657 let remote_state = ChangeId::parse(&ready.remote_state)
658 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
659 let PullWantPlan {
660 wants,
661 wanted_types,
662 want_full_closure,
663 } = plan_pull_wants(
664 repo,
665 &remote_state,
666 ready.full_closure_available,
667 ready.objects_to_fetch,
668 allow_partial_fetch,
669 )?;
670
671 tx.send(PullMessage {
672 body: Some(pull_message::Body::Want(WantObjects {
673 objects: wants.clone(),
674 want_full_closure,
675 transfer: Some(self.transport.transfer_checkpoint_with_mode(
676 transfer_id.clone(),
677 TransportMode::NativePack,
678 0,
679 0,
680 false,
681 )),
682 })),
683 })
684 .await
685 .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
686 drop(tx);
687
688 let receive_start = Instant::now();
689 let mut pack_state = proto::PackChunkState::default();
690 let mut received = 0usize;
691 while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
692 match message.body {
693 Some(pull_message::Body::Pack(chunk)) => {
694 profile.bytes_received =
695 profile.bytes_received.saturating_add(chunk.data.len());
696 profile.pack_bytes_received =
697 profile.pack_bytes_received.saturating_add(chunk.data.len());
698 let transfer = chunk.transfer.as_ref().ok_or_else(|| {
699 ProtocolError::InvalidState(
700 "native pack chunk missing transfer checkpoint".to_string(),
701 )
702 })?;
703 let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
704 .unwrap_or(PackStreamKind::Unspecified);
705 if stream_kind == PackStreamKind::Unspecified {
706 return Err(ProtocolError::InvalidState(
707 "native pack chunk missing stream kind".to_string(),
708 ));
709 }
710 let decode_start = Instant::now();
711 proto::receive_pack_chunk(
712 &mut pack_state,
713 stream_kind == PackStreamKind::Index,
714 transfer.resume_offset,
715 transfer.chunk_index,
716 transfer.is_complete,
717 &chunk.data,
718 chunk.is_final_chunk,
719 )?;
720 let decode_elapsed = decode_start.elapsed();
721 profile.pack_decode += decode_elapsed;
722 profile.pack_decode_apply += decode_elapsed;
723 profile.decode += decode_elapsed;
724 }
725 Some(pull_message::Body::Redaction(transfer)) => {
726 profile.bytes_received = profile
732 .bytes_received
733 .saturating_add(transfer.redactions_blob.len());
734 profile.object_mix.record(ObjectType::Redaction);
735 let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
736 ProtocolError::InvalidState(format!(
737 "RedactionTransfer.blob_hash is not a valid content hash: {err}"
738 ))
739 })?;
740 let decode_start = Instant::now();
741 repo.accept_wire_redactions(blob, &transfer.redactions_blob)
742 .map_err(|err| {
743 ProtocolError::InvalidState(format!(
744 "accept_wire_redactions for blob {}: {err}",
745 transfer.blob_hash
746 ))
747 })?;
748 let decode_elapsed = decode_start.elapsed();
749 profile.store_receive_object += decode_elapsed;
750 }
751 Some(pull_message::Body::Complete(complete)) => {
752 profile.receive_and_apply = receive_start.elapsed();
753 let final_state = if complete.new_state.is_empty() {
754 None
755 } else {
756 Some(
757 ChangeId::parse(&complete.new_state)
758 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
759 )
760 };
761
762 if complete.success {
763 if want_full_closure || !wants.is_empty() {
764 if !pack_state.is_complete() {
765 return Err(ProtocolError::InvalidState(
766 "pull completed before native pack stream finished".to_string(),
767 ));
768 }
769 let store_start = Instant::now();
770 let installed_ids = proto::install_received_pack(
771 repo.store(),
772 &pack_state.pack_data,
773 &pack_state.index_data,
774 )?;
775 profile.store_receive_object += store_start.elapsed();
776 received = installed_ids.len();
777 for id in installed_ids {
778 match (id, wanted_types.get(&id).copied()) {
779 (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
780 profile.object_mix.record(ObjectType::Blob);
781 repo.clear_missing_blob(&hash)?;
782 }
783 (_, Some(obj_type)) => {
784 profile.object_mix.record(obj_type);
785 }
786 (PackObjectId::ChangeId(_), None) => {
787 profile.object_mix.record(ObjectType::State);
788 }
789 (PackObjectId::Hash(hash), None) => {
790 let inferred =
791 infer_installed_hash_object_type(repo, &hash)?;
792 profile.object_mix.record(inferred);
793 }
794 }
795 }
796 }
797
798 let metadata_start = Instant::now();
799 if let Some(local_thread) = options.local_thread
800 && let Some(state) = final_state
801 {
802 repo.refs().set_thread(local_thread, &state)?;
803 }
804 if let Some(state) = final_state
805 && allow_partial_fetch
806 {
807 mark_missing_blobs_for_state(repo, state)?;
808 } else if final_state.is_some() {
809 let _ = repo.clear_all_missing_blobs()?;
810 }
811 let synced_markers = complete
812 .transfer
813 .as_ref()
814 .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
815 .transpose()?
816 .unwrap_or(false);
817 if !synced_markers {
818 self.sync_local_markers(repo, repo_path).await?;
819 }
820 profile.metadata_sync = metadata_start.elapsed();
821 profile.objects_received = received;
822 return Ok(PullExchange {
823 result: PullComplete {
824 success: true,
825 final_state,
826 error: None,
827 transfer_id: complete
828 .transfer
829 .as_ref()
830 .map(|transfer| transfer.transfer_id.clone())
831 .unwrap_or_default(),
832 transport_mode: complete
833 .transfer
834 .as_ref()
835 .map(|transfer| transport_mode_name(transfer.transport_mode))
836 .unwrap_or("native-pack")
837 .to_string(),
838 resume_offset: complete
839 .transfer
840 .as_ref()
841 .map(|transfer| transfer.resume_offset)
842 .unwrap_or_default(),
843 chunk_index: complete
844 .transfer
845 .as_ref()
846 .map(|transfer| transfer.chunk_index)
847 .unwrap_or_default(),
848 checkpoint: complete
849 .transfer
850 .as_ref()
851 .map(|transfer| transfer.checkpoint.clone())
852 .unwrap_or_default(),
853 is_complete: complete
854 .transfer
855 .as_ref()
856 .map(|transfer| transfer.is_complete)
857 .unwrap_or(false),
858 },
859 object_count: received,
860 profile,
861 });
862 }
863
864 profile.objects_received = received;
865 return Ok(PullExchange {
866 result: PullComplete {
867 success: false,
868 final_state,
869 error: (!complete.error.is_empty()).then_some(complete.error),
870 transfer_id: complete
871 .transfer
872 .as_ref()
873 .map(|transfer| transfer.transfer_id.clone())
874 .unwrap_or_default(),
875 transport_mode: complete
876 .transfer
877 .as_ref()
878 .map(|transfer| transport_mode_name(transfer.transport_mode))
879 .unwrap_or("native-pack")
880 .to_string(),
881 resume_offset: complete
882 .transfer
883 .as_ref()
884 .map(|transfer| transfer.resume_offset)
885 .unwrap_or_default(),
886 chunk_index: complete
887 .transfer
888 .as_ref()
889 .map(|transfer| transfer.chunk_index)
890 .unwrap_or_default(),
891 checkpoint: complete
892 .transfer
893 .as_ref()
894 .map(|transfer| transfer.checkpoint.clone())
895 .unwrap_or_default(),
896 is_complete: complete
897 .transfer
898 .as_ref()
899 .map(|transfer| transfer.is_complete)
900 .unwrap_or(false),
901 },
902 object_count: received,
903 profile,
904 });
905 }
906 _ => {}
907 }
908 }
909
910 Err(ProtocolError::InvalidState(format!(
911 "pull stream ended unexpectedly after receiving {received} packed objects"
912 )))
913 }
914}
915
916fn load_thread_metadata(
917 repo: &Repository,
918 target_thread: &str,
919 local_state: ChangeId,
920) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
921 let thread_manager = ThreadManager::new(repo.heddle_dir());
922 Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
923}
924
925fn plan_pull_wants(
926 repo: &Repository,
927 remote_state: &ChangeId,
928 full_closure_available: bool,
929 objects_to_fetch: Vec<ObjectDescriptor>,
930 allow_partial_fetch: bool,
931) -> Result<PullWantPlan, ProtocolError> {
932 if full_closure_available {
933 return Ok(PullWantPlan {
934 wants: Vec::new(),
935 wanted_types: HashMap::new(),
936 want_full_closure: true,
937 });
938 }
939 let request_full_closure =
940 should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
941 let mut wants = Vec::with_capacity(objects_to_fetch.len());
942 let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
943
944 for descriptor in objects_to_fetch {
945 let info = parse_descriptor_to_info(descriptor)?;
946 let pack_id = match &info.id {
947 proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
948 proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
949 };
950 let include = if request_full_closure {
951 true
952 } else {
953 let has = proto::has_object(repo.store(), &info)?;
954 !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
955 };
956
957 if include {
958 wanted_types.insert(pack_id, info.obj_type);
959 wants.push(object_descriptor_with_status(
960 &info,
961 ObjectAvailabilityStatus::Missing,
962 "requested by client",
963 ));
964 }
965 }
966
967 Ok(PullWantPlan {
968 wants,
969 wanted_types,
970 want_full_closure: false,
971 })
972}
973
974fn supports_compact_full_pull(
975 repo: &Repository,
976 allow_partial_fetch: bool,
977 exclude_states: &[ChangeId],
978) -> Result<bool, ProtocolError> {
979 if allow_partial_fetch || !exclude_states.is_empty() {
980 return Ok(false);
981 }
982 repo_looks_fresh(repo)
983}
984
985fn should_request_full_closure(
986 repo: &Repository,
987 remote_state: &ChangeId,
988 allow_partial_fetch: bool,
989) -> Result<bool, ProtocolError> {
990 if allow_partial_fetch || repo.store().has_state(remote_state)? {
991 return Ok(false);
992 }
993 repo_looks_fresh(repo)
994}
995
996fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
997 if repo.head()?.is_some() {
998 return Ok(false);
999 }
1000 if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1001 return Ok(false);
1002 }
1003 Ok(repo.missing_blobs()?.is_empty())
1004}
1005
1006fn infer_installed_hash_object_type(
1007 repo: &Repository,
1008 hash: &ContentHash,
1009) -> Result<ObjectType, ProtocolError> {
1010 let store = repo.store();
1011 if store.get_tree(hash)?.is_some() {
1012 return Ok(ObjectType::Tree);
1013 }
1014 if store
1015 .get_action(&objects::object::ActionId::from_hash(*hash))?
1016 .is_some()
1017 {
1018 return Ok(ObjectType::Action);
1019 }
1020 Ok(ObjectType::Blob)
1021}
1022
1023fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1024 const HEADER: &str = "heddle-markers-v1\n";
1025 if checkpoint.is_empty() {
1026 return Ok(false);
1027 }
1028 let payload = std::str::from_utf8(checkpoint)
1029 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1030 let Some(lines) = payload.strip_prefix(HEADER) else {
1031 return Ok(false);
1032 };
1033
1034 for line in lines.lines() {
1035 if line.is_empty() {
1036 continue;
1037 }
1038 let Some((name, change_id)) = line.split_once('\t') else {
1039 return Err(ProtocolError::InvalidState(
1040 "invalid marker snapshot line".to_string(),
1041 ));
1042 };
1043 let change_id = ChangeId::parse(change_id)
1044 .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1045 if !repo.store().has_state(&change_id)? {
1046 continue;
1047 }
1048 match repo.refs().get_marker(name)? {
1049 Some(existing) if existing == change_id => {}
1050 Some(existing) => repo.refs().set_marker_cas(
1051 name,
1052 refs::RefExpectation::Value(existing),
1053 &change_id,
1054 )?,
1055 None => repo.refs().create_marker(name, &change_id)?,
1056 }
1057 }
1058
1059 Ok(true)
1060}
1061
1062fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1063 if s.is_empty() {
1064 return Vec::new();
1065 }
1066 objects::object::ChangeId::parse(s)
1067 .map(|id| id.as_bytes().to_vec())
1068 .unwrap_or_default()
1069}
1070
1071fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1072 ThreadMetadata {
1073 name: metadata.thread.clone(),
1074 target_thread: metadata.target_thread.clone(),
1075 parent_thread: metadata.parent_thread.clone(),
1076 task: metadata.task.clone(),
1077 thread_mode: metadata.mode.to_string(),
1078 thread_state: metadata.state.to_string(),
1079 freshness: metadata.freshness.to_string(),
1080 base_state: change_id_string_to_bytes(&metadata.base_state),
1081 base_root: change_id_string_to_bytes(&metadata.base_root),
1082 current_state: metadata
1083 .current_state
1084 .as_deref()
1085 .map(change_id_string_to_bytes),
1086 merged_state: metadata
1087 .merged_state
1088 .as_deref()
1089 .map(change_id_string_to_bytes),
1090 changed_paths: metadata.changed_paths.clone(),
1091 impact_categories: metadata
1092 .impact_categories
1093 .iter()
1094 .map(ToString::to_string)
1095 .collect(),
1096 heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1097 promotion_suggested: metadata.promotion_suggested,
1098 verification_summary: Some(ThreadVerificationSummary {
1099 tests_passed: metadata.verification_summary.tests_passed,
1100 tests_failed: metadata
1101 .verification_summary
1102 .tests_failed
1103 .unwrap_or_default(),
1104 coverage_pct: metadata.verification_summary.coverage_pct,
1105 lint_warnings: metadata.verification_summary.lint_warnings,
1106 }),
1107 confidence_summary: Some(ThreadConfidenceSummary {
1108 value: metadata.confidence_summary.value,
1109 band: metadata
1110 .confidence_summary
1111 .band
1112 .as_ref()
1113 .map(ToString::to_string),
1114 }),
1115 integration_policy_result: Some(ThreadIntegrationPolicy {
1116 status: metadata
1117 .integration_policy_result
1118 .status
1119 .clone()
1120 .unwrap_or_default(),
1121 reason: metadata
1122 .integration_policy_result
1123 .reason
1124 .clone()
1125 .unwrap_or_default(),
1126 }),
1127 created_at: Some(prost_types::Timestamp {
1128 seconds: metadata.created_at.timestamp(),
1129 nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1130 }),
1131 updated_at: Some(prost_types::Timestamp {
1132 seconds: metadata.updated_at.timestamp(),
1133 nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1134 }),
1135 }
1136}
1137
1138struct PullExchange {
1139 result: PullComplete,
1140 object_count: usize,
1141 profile: PullProfile,
1142}
1143
1144fn mark_missing_blobs_for_state(
1145 repo: &Repository,
1146 state_id: ChangeId,
1147) -> Result<(), ProtocolError> {
1148 let state = repo
1149 .store()
1150 .get_state(&state_id)?
1151 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1152 let mut missing = collect_missing_blobs(repo, &state.tree);
1153 if let Some(context_root) = state.context.as_ref() {
1154 missing.extend(collect_missing_blobs(repo, context_root));
1155 }
1156 missing
1157 .into_iter()
1158 .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1159}
1160
1161fn clear_missing_blobs_for_state(
1162 repo: &Repository,
1163 state_id: ChangeId,
1164) -> Result<(), ProtocolError> {
1165 let state = repo
1166 .store()
1167 .get_state(&state_id)?
1168 .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1169 let mut missing = collect_missing_blobs(repo, &state.tree);
1170 if let Some(context_root) = state.context.as_ref() {
1171 missing.extend(collect_missing_blobs(repo, context_root));
1172 }
1173 missing
1174 .into_iter()
1175 .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1176}
1177
1178fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1179 let mut missing = Vec::new();
1180 collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1181 missing
1182}
1183
1184fn collect_missing_blobs_recursive(
1185 repo: &Repository,
1186 tree_hash: &ContentHash,
1187 missing: &mut Vec<ContentHash>,
1188) {
1189 let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1190 return;
1191 };
1192 for entry in tree.entries() {
1193 match entry.entry_type {
1194 objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1195 if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1196 missing.push(entry.hash);
1197 }
1198 }
1199 objects::object::EntryType::Tree => {
1200 collect_missing_blobs_recursive(repo, &entry.hash, missing);
1201 }
1202 }
1203 }
1204}
1205
1206fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1207 match repo.missing_blobs() {
1208 Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1209 Ok(_) => PartialFetchStatus::Disabled as i32,
1210 Err(_) => PartialFetchStatus::Unspecified as i32,
1211 }
1212}
1213
1214fn pull_transfer_id(
1215 repo_path: &str,
1216 remote_thread: &str,
1217 local_thread: Option<&str>,
1218 depth: Option<u32>,
1219 target_state: Option<ChangeId>,
1220) -> String {
1221 format!(
1222 "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1223 local_thread.unwrap_or_default(),
1224 target_state
1225 .map(|value| value.to_string_full())
1226 .unwrap_or_default()
1227 )
1228}
1229
1230fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1231 format!(
1232 "push:{repo_path}:{}:{target_thread}",
1233 local_state.to_string_full()
1234 )
1235}
1236
1237fn encode_native_pack_messages(
1238 bundle: &proto::NativePackBundle,
1239 transfer_id: &str,
1240 chunk_size: usize,
1241 transport: &super::helpers::HostedTransportPolicy,
1242 transport_mode: TransportMode,
1243) -> Result<Vec<PushMessage>, ProtocolError> {
1244 let mut messages = Vec::new();
1245 let chunk_size = chunk_size.max(1);
1246
1247 let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1248 for chunk_index in 0..pack_total_chunks.max(1) {
1249 let Some((start, len)) =
1250 proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1251 else {
1252 break;
1253 };
1254 messages.push(PushMessage {
1255 body: Some(push_message::Body::Pack(PackChunk {
1256 stream_kind: PackStreamKind::Pack as i32,
1257 data: bundle.pack_data[start..start + len].to_vec(),
1258 transfer: Some(transport.transfer_checkpoint_with_mode(
1259 transfer_id,
1260 transport_mode,
1261 chunk_index as u32,
1262 start as u64,
1263 chunk_index + 1 == pack_total_chunks,
1264 )),
1265 chunk_length: len as u32,
1266 is_final_chunk: chunk_index + 1 == pack_total_chunks,
1267 })),
1268 });
1269 }
1270
1271 let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1272 for chunk_index in 0..index_total_chunks.max(1) {
1273 let Some((start, len)) =
1274 proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1275 else {
1276 break;
1277 };
1278 messages.push(PushMessage {
1279 body: Some(push_message::Body::Pack(PackChunk {
1280 stream_kind: PackStreamKind::Index as i32,
1281 data: bundle.index_data[start..start + len].to_vec(),
1282 transfer: Some(transport.transfer_checkpoint_with_mode(
1283 transfer_id,
1284 transport_mode,
1285 chunk_index as u32,
1286 start as u64,
1287 chunk_index + 1 == index_total_chunks,
1288 )),
1289 chunk_length: len as u32,
1290 is_final_chunk: chunk_index + 1 == index_total_chunks,
1291 })),
1292 });
1293 }
1294 Ok(messages)
1295}
1296
1297fn preferred_transport_mode(
1298 transport: &super::helpers::HostedTransportPolicy,
1299 object_count: usize,
1300) -> TransportMode {
1301 let _ = transport;
1302 let _ = object_count;
1303 TransportMode::NativePack
1304}