Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7    GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9    ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata, ThreadVerificationSummary,
10    TransportMode, UpdateRefRequest, WantObjects, pull_message, push_message,
11};
12use objects::{
13    object::{ChangeId, ContentHash},
14    store::PackObjectId,
15};
16use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
17use repo::{Repository, SyncedThreadMetadata, ThreadManager};
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Request;
21
22use super::{
23    HostedGrpcClient, PullMaterialization,
24    helpers::{
25        descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
26        status_to_protocol_error, to_proto_object_info, transport_mode_name,
27    },
28};
29
30#[derive(Clone, Copy)]
31struct PullOptions<'a> {
32    local_thread: Option<&'a str>,
33    depth: Option<u32>,
34    target_state: Option<ChangeId>,
35    materialization: PullMaterialization,
36}
37
38struct PullWantPlan {
39    wants: Vec<ObjectDescriptor>,
40    wanted_types: HashMap<PackObjectId, ObjectType>,
41    want_full_closure: bool,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct PullObjectMix {
46    pub blobs: usize,
47    pub trees: usize,
48    pub states: usize,
49    pub actions: usize,
50    pub redactions: usize,
51}
52
53impl PullObjectMix {
54    fn record(&mut self, obj_type: ObjectType) {
55        match obj_type {
56            ObjectType::Blob => self.blobs += 1,
57            ObjectType::Tree => self.trees += 1,
58            ObjectType::State => self.states += 1,
59            ObjectType::Action => self.actions += 1,
60            ObjectType::Redaction => self.redactions += 1,
61        }
62    }
63
64    pub fn total(&self) -> usize {
65        self.blobs + self.trees + self.states + self.actions + self.redactions
66    }
67}
68
69#[derive(Debug, Clone, Default)]
70pub struct PullProfile {
71    pub ready_wait: Duration,
72    pub receive_and_apply: Duration,
73    pub decode: Duration,
74    pub store_receive_object: Duration,
75    pub metadata_sync: Duration,
76    pub pack_decode_apply: Duration,
77    pub raw_decode_apply: Duration,
78    pub pack_decode: Duration,
79    pub raw_decode: Duration,
80    pub bytes_received: usize,
81    pub pack_bytes_received: usize,
82    pub raw_bytes_received: usize,
83    pub objects_received: usize,
84    pub object_mix: PullObjectMix,
85}
86
87impl HostedGrpcClient {
88    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
89        let mut request = Request::new(ListRefsRequest {
90            repo_path: repo_path.to_string(),
91        });
92        self.apply_auth(&mut request)?;
93        let response = self
94            .inner
95            .list_refs(request)
96            .await
97            .map_err(status_to_protocol_error)?
98            .into_inner();
99        response
100            .refs
101            .into_iter()
102            .map(|entry| {
103                Ok(RefEntry {
104                    name: entry.name,
105                    change_id: ChangeId::try_from_slice(&entry.change_id)
106                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
107                    is_thread: entry.is_thread,
108                })
109            })
110            .collect()
111    }
112
113    #[allow(clippy::too_many_arguments)]
114    pub async fn update_ref(
115        &mut self,
116        repo_path: &str,
117        name: &str,
118        is_thread: bool,
119        old_value: Option<ChangeId>,
120        new_value: ChangeId,
121        force: bool,
122        thread_metadata: Option<&SyncedThreadMetadata>,
123    ) -> Result<RefUpdated, ProtocolError> {
124        let mut request = Request::new(UpdateRefRequest {
125            repo_path: repo_path.to_string(),
126            name: name.to_string(),
127            is_thread,
128            force,
129            old_value: old_value
130                .map(|value| value.to_string_full())
131                .unwrap_or_default(),
132            new_value: new_value.to_string_full(),
133            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
134            client_operation_id: String::new(),
135        });
136        self.apply_auth(&mut request)?;
137        let response = self
138            .inner
139            .update_ref(request)
140            .await
141            .map_err(status_to_protocol_error)?
142            .into_inner();
143        Ok(RefUpdated {
144            success: response.success,
145            old_value: if response.old_value.is_empty() {
146                None
147            } else {
148                Some(
149                    ChangeId::parse(&response.old_value)
150                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
151                )
152            },
153            error: (!response.error.is_empty()).then_some(response.error),
154        })
155    }
156
157    pub async fn push(
158        &mut self,
159        repo: &Repository,
160        repo_path: &str,
161        local_state: ChangeId,
162        target_thread: &str,
163        force: bool,
164    ) -> Result<PushComplete, ProtocolError> {
165        let _ = self.transport.chunk_size;
166        let _ = self.transport.resume_attempts;
167        let _ = self.transport.negotiated.chunk_size();
168        let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
169        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
170        let transport_mode = preferred_transport_mode(&self.transport, objects.len());
171        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
172        let request_message = PushMessage {
173            body: Some(push_message::Body::Request(PushRequest {
174                repo_path: repo_path.to_string(),
175                local_state: local_state.to_string_full(),
176                target_thread: target_thread.to_string(),
177                create_thread: true,
178                force,
179                objects: objects.iter().map(to_proto_object_info).collect(),
180                transfer: Some(self.transport.transfer_checkpoint_with_mode(
181                    transfer_id.clone(),
182                    transport_mode,
183                    0,
184                    0,
185                    false,
186                )),
187                partial_fetch_status: partial_fetch_status_for_repo(repo),
188                allow_partial_fetch: true,
189                thread_metadata: thread_metadata
190                    .map(|metadata| to_proto_thread_metadata(&metadata)),
191                client_operation_id: String::new(),
192            })),
193        };
194
195        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
196        tx.send(request_message).await.map_err(|_| {
197            ProtocolError::InvalidState("failed to initialize push stream".to_string())
198        })?;
199        let mut request = Request::new(ReceiverStream::new(rx));
200        self.apply_auth(&mut request)?;
201        let mut response = self
202            .inner
203            .push(request)
204            .await
205            .map_err(status_to_protocol_error)?
206            .into_inner();
207
208        let ready = match response.message().await.map_err(status_to_protocol_error)? {
209            Some(PushMessage {
210                body: Some(push_message::Body::Ready(ready)),
211            }) => ready,
212            _ => {
213                return Err(ProtocolError::InvalidState(
214                    "expected PushReady from gRPC server".to_string(),
215                ));
216            }
217        };
218
219        let object_index = objects
220            .into_iter()
221            .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
222            .collect::<HashMap<_, _>>();
223
224        let ready_transport_mode = ready
225            .transfer
226            .as_ref()
227            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
228            .unwrap_or(transport_mode);
229        let wanted_infos = ready
230            .want_objects
231            .into_iter()
232            .map(|want| {
233                object_index
234                    .get(&descriptor_id(&want))
235                    .cloned()
236                    .ok_or_else(|| {
237                        ProtocolError::InvalidState("server requested unknown object".to_string())
238                    })
239            })
240            .collect::<Result<Vec<_>, _>>()?;
241
242        if !wanted_infos.is_empty() {
243            let bundle = proto::build_native_pack(repo.store(), &wanted_infos)?;
244            for message in encode_native_pack_messages(
245                &bundle,
246                &transfer_id,
247                self.transport.chunk_size.max(1),
248                &self.transport,
249                ready_transport_mode,
250            )? {
251                tx.send(message).await.map_err(|_| {
252                    ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
253                })?;
254            }
255        }
256        drop(tx);
257
258        let result = match response.message().await.map_err(status_to_protocol_error)? {
259            Some(PushMessage {
260                body: Some(push_message::Body::Complete(complete)),
261            }) => PushComplete {
262                success: complete.success,
263                new_state: if complete.new_state.is_empty() {
264                    None
265                } else {
266                    Some(
267                        ChangeId::parse(&complete.new_state)
268                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
269                    )
270                },
271                error: (!complete.error.is_empty()).then_some(complete.error),
272                transfer_id: complete
273                    .transfer
274                    .as_ref()
275                    .map(|transfer| transfer.transfer_id.clone())
276                    .unwrap_or_default(),
277                transport_mode: complete
278                    .transfer
279                    .as_ref()
280                    .map(|transfer| transport_mode_name(transfer.transport_mode))
281                    .unwrap_or("raw-objects")
282                    .to_string(),
283                resume_offset: complete
284                    .transfer
285                    .as_ref()
286                    .map(|transfer| transfer.resume_offset)
287                    .unwrap_or_default(),
288                chunk_index: complete
289                    .transfer
290                    .as_ref()
291                    .map(|transfer| transfer.chunk_index)
292                    .unwrap_or_default(),
293                checkpoint: complete
294                    .transfer
295                    .as_ref()
296                    .map(|transfer| transfer.checkpoint.clone())
297                    .unwrap_or_default(),
298                is_complete: complete
299                    .transfer
300                    .as_ref()
301                    .map(|transfer| transfer.is_complete)
302                    .unwrap_or(false),
303            },
304            _ => {
305                return Err(ProtocolError::InvalidState(
306                    "expected PushComplete from gRPC server".to_string(),
307                ));
308            }
309        };
310
311        if result.success {
312            self.sync_remote_markers(repo, repo_path, local_state)
313                .await?;
314        }
315        Ok(result)
316    }
317
318    pub async fn pull(
319        &mut self,
320        repo: &Repository,
321        repo_path: &str,
322        remote_thread: &str,
323        local_thread: Option<&str>,
324    ) -> Result<PullComplete, ProtocolError> {
325        self.pull_with_options(
326            repo,
327            repo_path,
328            remote_thread,
329            PullOptions {
330                local_thread,
331                depth: None,
332                target_state: None,
333                materialization: PullMaterialization::Full,
334            },
335        )
336        .await
337    }
338
339    pub async fn pull_profiled(
340        &mut self,
341        repo: &Repository,
342        repo_path: &str,
343        remote_thread: &str,
344        local_thread: Option<&str>,
345    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
346        self.pull_exchange(
347            repo,
348            repo_path,
349            remote_thread,
350            PullOptions {
351                local_thread,
352                depth: None,
353                target_state: None,
354                materialization: PullMaterialization::Full,
355            },
356        )
357        .await
358        .map(|exchange| (exchange.result, exchange.profile))
359    }
360
361    pub async fn pull_partial(
362        &mut self,
363        repo: &Repository,
364        repo_path: &str,
365        remote_thread: &str,
366        local_thread: Option<&str>,
367    ) -> Result<PullComplete, ProtocolError> {
368        self.pull_with_options(
369            repo,
370            repo_path,
371            remote_thread,
372            PullOptions {
373                local_thread,
374                depth: None,
375                target_state: None,
376                materialization: PullMaterialization::Lazy,
377            },
378        )
379        .await
380    }
381
382    pub async fn pull_with_depth_and_materialization(
383        &mut self,
384        repo: &Repository,
385        repo_path: &str,
386        remote_thread: &str,
387        local_thread: Option<&str>,
388        depth: Option<u32>,
389        materialization: PullMaterialization,
390    ) -> Result<PullComplete, ProtocolError> {
391        self.pull_with_options(
392            repo,
393            repo_path,
394            remote_thread,
395            PullOptions {
396                local_thread,
397                depth,
398                target_state: None,
399                materialization,
400            },
401        )
402        .await
403    }
404
405    pub async fn pull_with_depth(
406        &mut self,
407        repo: &Repository,
408        repo_path: &str,
409        remote_thread: &str,
410        local_thread: Option<&str>,
411        depth: Option<u32>,
412    ) -> Result<PullComplete, ProtocolError> {
413        self.pull_with_depth_and_materialization(
414            repo,
415            repo_path,
416            remote_thread,
417            local_thread,
418            depth,
419            PullMaterialization::Full,
420        )
421        .await
422    }
423
424    pub async fn fetch_state(
425        &mut self,
426        repo: &Repository,
427        repo_path: &str,
428        remote_thread: &str,
429        target_state: ChangeId,
430    ) -> Result<usize, ProtocolError> {
431        self.pull_exchange(
432            repo,
433            repo_path,
434            remote_thread,
435            PullOptions {
436                local_thread: None,
437                depth: None,
438                target_state: Some(target_state),
439                materialization: PullMaterialization::Full,
440            },
441        )
442        .await
443        .map(|exchange| exchange.object_count)
444    }
445
446    pub async fn fetch_state_partial(
447        &mut self,
448        repo: &Repository,
449        repo_path: &str,
450        remote_thread: &str,
451        target_state: ChangeId,
452    ) -> Result<usize, ProtocolError> {
453        self.pull_exchange(
454            repo,
455            repo_path,
456            remote_thread,
457            PullOptions {
458                local_thread: None,
459                depth: None,
460                target_state: Some(target_state),
461                materialization: PullMaterialization::Lazy,
462            },
463        )
464        .await
465        .map(|exchange| exchange.object_count)
466    }
467
468    pub async fn hydrate_blob_at_path(
469        &mut self,
470        repo: &Repository,
471        repo_path: &str,
472        reference: &str,
473        path: &str,
474    ) -> Result<objects::object::Blob, ProtocolError> {
475        let mut request = Request::new(GetBlobRequest {
476            repo_path: repo_path.to_string(),
477            r#ref: reference.to_string(),
478            path: path.to_string(),
479        });
480        self.apply_auth(&mut request)?;
481        let response = self
482            .content
483            .get_blob(request)
484            .await
485            .map_err(status_to_protocol_error)?
486            .into_inner();
487
488        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
489        let blob = objects::object::Blob::new(content);
490        repo.store().put_blob(&blob)?;
491        repo.clear_missing_blob(&blob.hash())?;
492        Ok(blob)
493    }
494
495    pub async fn hydrate_missing_blobs_for_state(
496        &mut self,
497        repo: &Repository,
498        repo_path: &str,
499        remote_thread: &str,
500        target_state: ChangeId,
501    ) -> Result<usize, ProtocolError> {
502        let exchange = self
503            .pull_exchange(
504                repo,
505                repo_path,
506                remote_thread,
507                PullOptions {
508                    local_thread: None,
509                    depth: None,
510                    target_state: Some(target_state),
511                    materialization: PullMaterialization::Full,
512                },
513            )
514            .await?;
515        clear_missing_blobs_for_state(repo, target_state)?;
516        Ok(exchange.object_count)
517    }
518
519    async fn pull_with_options(
520        &mut self,
521        repo: &Repository,
522        repo_path: &str,
523        remote_thread: &str,
524        options: PullOptions<'_>,
525    ) -> Result<PullComplete, ProtocolError> {
526        self.pull_exchange(repo, repo_path, remote_thread, options)
527            .await
528            .map(|exchange| exchange.result)
529    }
530
531    async fn pull_exchange(
532        &mut self,
533        repo: &Repository,
534        repo_path: &str,
535        remote_thread: &str,
536        options: PullOptions<'_>,
537    ) -> Result<PullExchange, ProtocolError> {
538        let exchange_start = Instant::now();
539        let mut exclude_states = Vec::new();
540        if let Some(local_thread) = options.local_thread
541            && let Some(head) = repo.refs().get_thread(local_thread)?
542        {
543            exclude_states.push(head);
544        }
545        let allow_partial_fetch = options.materialization.allows_partial_fetch();
546        let fresh_full_pull =
547            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
548
549        let transfer_id = pull_transfer_id(
550            repo_path,
551            remote_thread,
552            options.local_thread,
553            options.depth,
554            options.target_state,
555        );
556        let request_message = PullMessage {
557            body: Some(pull_message::Body::Request(PullRequest {
558                repo_path: repo_path.to_string(),
559                remote_thread: remote_thread.to_string(),
560                local_thread: options.local_thread.unwrap_or_default().to_string(),
561                target_state: options
562                    .target_state
563                    .map(|value| value.to_string_full())
564                    .unwrap_or_default(),
565                depth: options.depth.unwrap_or_default(),
566                exclude_states: exclude_states
567                    .iter()
568                    .map(ChangeId::to_string_full)
569                    .collect(),
570                transfer: Some(self.transport.transfer_checkpoint_with_mode(
571                    transfer_id.clone(),
572                    TransportMode::NativePack,
573                    0,
574                    0,
575                    false,
576                )),
577                partial_fetch_status: partial_fetch_status_for_repo(repo),
578                allow_partial_fetch,
579                fresh_full_pull,
580                client_operation_id: String::new(),
581            })),
582        };
583
584        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
585        tx.send(request_message).await.map_err(|_| {
586            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
587        })?;
588        let mut request = Request::new(ReceiverStream::new(rx));
589        self.apply_auth(&mut request)?;
590        let mut response = self
591            .inner
592            .pull(request)
593            .await
594            .map_err(status_to_protocol_error)?
595            .into_inner();
596
597        let ready = match response.message().await.map_err(status_to_protocol_error)? {
598            Some(PullMessage {
599                body: Some(pull_message::Body::Ready(ready)),
600            }) => ready,
601            _ => {
602                return Err(ProtocolError::InvalidState(
603                    "expected PullReady from gRPC server".to_string(),
604                ));
605            }
606        };
607        let mut profile = PullProfile {
608            ready_wait: exchange_start.elapsed(),
609            ..PullProfile::default()
610        };
611        let remote_state = ChangeId::parse(&ready.remote_state)
612            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
613        let PullWantPlan {
614            wants,
615            wanted_types,
616            want_full_closure,
617        } = plan_pull_wants(
618            repo,
619            &remote_state,
620            ready.full_closure_available,
621            ready.objects_to_fetch,
622            allow_partial_fetch,
623        )?;
624
625        tx.send(PullMessage {
626            body: Some(pull_message::Body::Want(WantObjects {
627                objects: wants.clone(),
628                want_full_closure,
629                transfer: Some(self.transport.transfer_checkpoint_with_mode(
630                    transfer_id.clone(),
631                    TransportMode::NativePack,
632                    0,
633                    0,
634                    false,
635                )),
636            })),
637        })
638        .await
639        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
640        drop(tx);
641
642        let receive_start = Instant::now();
643        let mut pack_state = proto::PackChunkState::default();
644        let mut received = 0usize;
645        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
646            match message.body {
647                Some(pull_message::Body::Pack(chunk)) => {
648                    profile.bytes_received =
649                        profile.bytes_received.saturating_add(chunk.data.len());
650                    profile.pack_bytes_received =
651                        profile.pack_bytes_received.saturating_add(chunk.data.len());
652                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
653                        ProtocolError::InvalidState(
654                            "native pack chunk missing transfer checkpoint".to_string(),
655                        )
656                    })?;
657                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
658                        .unwrap_or(PackStreamKind::Unspecified);
659                    if stream_kind == PackStreamKind::Unspecified {
660                        return Err(ProtocolError::InvalidState(
661                            "native pack chunk missing stream kind".to_string(),
662                        ));
663                    }
664                    let decode_start = Instant::now();
665                    proto::receive_pack_chunk(
666                        &mut pack_state,
667                        stream_kind == PackStreamKind::Index,
668                        transfer.resume_offset,
669                        transfer.chunk_index,
670                        transfer.is_complete,
671                        &chunk.data,
672                        chunk.is_final_chunk,
673                    )?;
674                    let decode_elapsed = decode_start.elapsed();
675                    profile.pack_decode += decode_elapsed;
676                    profile.pack_decode_apply += decode_elapsed;
677                    profile.decode += decode_elapsed;
678                }
679                Some(pull_message::Body::Complete(complete)) => {
680                    profile.receive_and_apply = receive_start.elapsed();
681                    let final_state = if complete.new_state.is_empty() {
682                        None
683                    } else {
684                        Some(
685                            ChangeId::parse(&complete.new_state)
686                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
687                        )
688                    };
689
690                    if complete.success {
691                        if want_full_closure || !wants.is_empty() {
692                            if !pack_state.is_complete() {
693                                return Err(ProtocolError::InvalidState(
694                                    "pull completed before native pack stream finished".to_string(),
695                                ));
696                            }
697                            let store_start = Instant::now();
698                            let installed_ids = proto::install_received_pack(
699                                repo.store(),
700                                &pack_state.pack_data,
701                                &pack_state.index_data,
702                            )?;
703                            profile.store_receive_object += store_start.elapsed();
704                            received = installed_ids.len();
705                            for id in installed_ids {
706                                match (id, wanted_types.get(&id).copied()) {
707                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
708                                        profile.object_mix.record(ObjectType::Blob);
709                                        repo.clear_missing_blob(&hash)?;
710                                    }
711                                    (_, Some(obj_type)) => {
712                                        profile.object_mix.record(obj_type);
713                                    }
714                                    (PackObjectId::ChangeId(_), None) => {
715                                        profile.object_mix.record(ObjectType::State);
716                                    }
717                                    (PackObjectId::Hash(hash), None) => {
718                                        let inferred =
719                                            infer_installed_hash_object_type(repo, &hash)?;
720                                        profile.object_mix.record(inferred);
721                                    }
722                                }
723                            }
724                        }
725
726                        let metadata_start = Instant::now();
727                        if let Some(local_thread) = options.local_thread
728                            && let Some(state) = final_state
729                        {
730                            repo.refs().set_thread(local_thread, &state)?;
731                        }
732                        if let Some(state) = final_state
733                            && allow_partial_fetch
734                        {
735                            mark_missing_blobs_for_state(repo, state)?;
736                        } else if final_state.is_some() {
737                            let _ = repo.clear_all_missing_blobs()?;
738                        }
739                        let synced_markers = complete
740                            .transfer
741                            .as_ref()
742                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
743                            .transpose()?
744                            .unwrap_or(false);
745                        if !synced_markers {
746                            self.sync_local_markers(repo, repo_path).await?;
747                        }
748                        profile.metadata_sync = metadata_start.elapsed();
749                        profile.objects_received = received;
750                        return Ok(PullExchange {
751                            result: PullComplete {
752                                success: true,
753                                final_state,
754                                error: None,
755                                transfer_id: complete
756                                    .transfer
757                                    .as_ref()
758                                    .map(|transfer| transfer.transfer_id.clone())
759                                    .unwrap_or_default(),
760                                transport_mode: complete
761                                    .transfer
762                                    .as_ref()
763                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
764                                    .unwrap_or("native-pack")
765                                    .to_string(),
766                                resume_offset: complete
767                                    .transfer
768                                    .as_ref()
769                                    .map(|transfer| transfer.resume_offset)
770                                    .unwrap_or_default(),
771                                chunk_index: complete
772                                    .transfer
773                                    .as_ref()
774                                    .map(|transfer| transfer.chunk_index)
775                                    .unwrap_or_default(),
776                                checkpoint: complete
777                                    .transfer
778                                    .as_ref()
779                                    .map(|transfer| transfer.checkpoint.clone())
780                                    .unwrap_or_default(),
781                                is_complete: complete
782                                    .transfer
783                                    .as_ref()
784                                    .map(|transfer| transfer.is_complete)
785                                    .unwrap_or(false),
786                            },
787                            object_count: received,
788                            profile,
789                        });
790                    }
791
792                    profile.objects_received = received;
793                    return Ok(PullExchange {
794                        result: PullComplete {
795                            success: false,
796                            final_state,
797                            error: (!complete.error.is_empty()).then_some(complete.error),
798                            transfer_id: complete
799                                .transfer
800                                .as_ref()
801                                .map(|transfer| transfer.transfer_id.clone())
802                                .unwrap_or_default(),
803                            transport_mode: complete
804                                .transfer
805                                .as_ref()
806                                .map(|transfer| transport_mode_name(transfer.transport_mode))
807                                .unwrap_or("native-pack")
808                                .to_string(),
809                            resume_offset: complete
810                                .transfer
811                                .as_ref()
812                                .map(|transfer| transfer.resume_offset)
813                                .unwrap_or_default(),
814                            chunk_index: complete
815                                .transfer
816                                .as_ref()
817                                .map(|transfer| transfer.chunk_index)
818                                .unwrap_or_default(),
819                            checkpoint: complete
820                                .transfer
821                                .as_ref()
822                                .map(|transfer| transfer.checkpoint.clone())
823                                .unwrap_or_default(),
824                            is_complete: complete
825                                .transfer
826                                .as_ref()
827                                .map(|transfer| transfer.is_complete)
828                                .unwrap_or(false),
829                        },
830                        object_count: received,
831                        profile,
832                    });
833                }
834                _ => {}
835            }
836        }
837
838        Err(ProtocolError::InvalidState(format!(
839            "pull stream ended unexpectedly after receiving {received} packed objects"
840        )))
841    }
842}
843
844fn load_thread_metadata(
845    repo: &Repository,
846    target_thread: &str,
847    local_state: ChangeId,
848) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
849    let thread_manager = ThreadManager::new(repo.heddle_dir());
850    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
851}
852
853fn plan_pull_wants(
854    repo: &Repository,
855    remote_state: &ChangeId,
856    full_closure_available: bool,
857    objects_to_fetch: Vec<ObjectDescriptor>,
858    allow_partial_fetch: bool,
859) -> Result<PullWantPlan, ProtocolError> {
860    if full_closure_available {
861        return Ok(PullWantPlan {
862            wants: Vec::new(),
863            wanted_types: HashMap::new(),
864            want_full_closure: true,
865        });
866    }
867    let request_full_closure =
868        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
869    let mut wants = Vec::with_capacity(objects_to_fetch.len());
870    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
871
872    for descriptor in objects_to_fetch {
873        let info = parse_descriptor_to_info(descriptor)?;
874        let pack_id = match &info.id {
875            proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
876            proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
877        };
878        let include = if request_full_closure {
879            true
880        } else {
881            let has = proto::has_object(repo.store(), &info)?;
882            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
883        };
884
885        if include {
886            wanted_types.insert(pack_id, info.obj_type);
887            wants.push(object_descriptor_with_status(
888                &info,
889                ObjectAvailabilityStatus::Missing,
890                "requested by client",
891            ));
892        }
893    }
894
895    Ok(PullWantPlan {
896        wants,
897        wanted_types,
898        want_full_closure: false,
899    })
900}
901
902fn supports_compact_full_pull(
903    repo: &Repository,
904    allow_partial_fetch: bool,
905    exclude_states: &[ChangeId],
906) -> Result<bool, ProtocolError> {
907    if allow_partial_fetch || !exclude_states.is_empty() {
908        return Ok(false);
909    }
910    repo_looks_fresh(repo)
911}
912
913fn should_request_full_closure(
914    repo: &Repository,
915    remote_state: &ChangeId,
916    allow_partial_fetch: bool,
917) -> Result<bool, ProtocolError> {
918    if allow_partial_fetch || repo.store().has_state(remote_state)? {
919        return Ok(false);
920    }
921    repo_looks_fresh(repo)
922}
923
924fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
925    if repo.head()?.is_some() {
926        return Ok(false);
927    }
928    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
929        return Ok(false);
930    }
931    Ok(repo.missing_blobs()?.is_empty())
932}
933
934fn infer_installed_hash_object_type(
935    repo: &Repository,
936    hash: &ContentHash,
937) -> Result<ObjectType, ProtocolError> {
938    let store = repo.store();
939    if store.get_tree(hash)?.is_some() {
940        return Ok(ObjectType::Tree);
941    }
942    if store
943        .get_action(&objects::object::ActionId::from_hash(*hash))?
944        .is_some()
945    {
946        return Ok(ObjectType::Action);
947    }
948    Ok(ObjectType::Blob)
949}
950
951fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
952    const HEADER: &str = "heddle-markers-v1\n";
953    if checkpoint.is_empty() {
954        return Ok(false);
955    }
956    let payload = std::str::from_utf8(checkpoint)
957        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
958    let Some(lines) = payload.strip_prefix(HEADER) else {
959        return Ok(false);
960    };
961
962    for line in lines.lines() {
963        if line.is_empty() {
964            continue;
965        }
966        let Some((name, change_id)) = line.split_once('\t') else {
967            return Err(ProtocolError::InvalidState(
968                "invalid marker snapshot line".to_string(),
969            ));
970        };
971        let change_id = ChangeId::parse(change_id)
972            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
973        if !repo.store().has_state(&change_id)? {
974            continue;
975        }
976        match repo.refs().get_marker(name)? {
977            Some(existing) if existing == change_id => {}
978            Some(existing) => repo.refs().set_marker_cas(
979                name,
980                refs::RefExpectation::Value(existing),
981                &change_id,
982            )?,
983            None => repo.refs().create_marker(name, &change_id)?,
984        }
985    }
986
987    Ok(true)
988}
989
990fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
991    if s.is_empty() {
992        return Vec::new();
993    }
994    objects::object::ChangeId::parse(s)
995        .map(|id| id.as_bytes().to_vec())
996        .unwrap_or_default()
997}
998
999fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1000    ThreadMetadata {
1001        name: metadata.thread.clone(),
1002        target_thread: metadata.target_thread.clone(),
1003        parent_thread: metadata.parent_thread.clone(),
1004        task: metadata.task.clone(),
1005        thread_mode: metadata.mode.to_string(),
1006        thread_state: metadata.state.to_string(),
1007        freshness: metadata.freshness.to_string(),
1008        base_state: change_id_string_to_bytes(&metadata.base_state),
1009        base_root: change_id_string_to_bytes(&metadata.base_root),
1010        current_state: metadata
1011            .current_state
1012            .as_deref()
1013            .map(change_id_string_to_bytes),
1014        merged_state: metadata
1015            .merged_state
1016            .as_deref()
1017            .map(change_id_string_to_bytes),
1018        changed_paths: metadata.changed_paths.clone(),
1019        impact_categories: metadata
1020            .impact_categories
1021            .iter()
1022            .map(ToString::to_string)
1023            .collect(),
1024        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1025        promotion_suggested: metadata.promotion_suggested,
1026        verification_summary: Some(ThreadVerificationSummary {
1027            tests_passed: metadata.verification_summary.tests_passed,
1028            tests_failed: metadata
1029                .verification_summary
1030                .tests_failed
1031                .unwrap_or_default(),
1032            coverage_pct: metadata.verification_summary.coverage_pct,
1033            lint_warnings: metadata.verification_summary.lint_warnings,
1034        }),
1035        confidence_summary: Some(ThreadConfidenceSummary {
1036            value: metadata.confidence_summary.value,
1037            band: metadata
1038                .confidence_summary
1039                .band
1040                .as_ref()
1041                .map(ToString::to_string),
1042        }),
1043        integration_policy_result: Some(ThreadIntegrationPolicy {
1044            status: metadata
1045                .integration_policy_result
1046                .status
1047                .clone()
1048                .unwrap_or_default(),
1049            reason: metadata
1050                .integration_policy_result
1051                .reason
1052                .clone()
1053                .unwrap_or_default(),
1054        }),
1055        created_at: Some(prost_types::Timestamp {
1056            seconds: metadata.created_at.timestamp(),
1057            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1058        }),
1059        updated_at: Some(prost_types::Timestamp {
1060            seconds: metadata.updated_at.timestamp(),
1061            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1062        }),
1063    }
1064}
1065
1066struct PullExchange {
1067    result: PullComplete,
1068    object_count: usize,
1069    profile: PullProfile,
1070}
1071
1072fn mark_missing_blobs_for_state(
1073    repo: &Repository,
1074    state_id: ChangeId,
1075) -> Result<(), ProtocolError> {
1076    let state = repo
1077        .store()
1078        .get_state(&state_id)?
1079        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1080    let mut missing = collect_missing_blobs(repo, &state.tree);
1081    if let Some(context_root) = state.context.as_ref() {
1082        missing.extend(collect_missing_blobs(repo, context_root));
1083    }
1084    missing
1085        .into_iter()
1086        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1087}
1088
1089fn clear_missing_blobs_for_state(
1090    repo: &Repository,
1091    state_id: ChangeId,
1092) -> Result<(), ProtocolError> {
1093    let state = repo
1094        .store()
1095        .get_state(&state_id)?
1096        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1097    let mut missing = collect_missing_blobs(repo, &state.tree);
1098    if let Some(context_root) = state.context.as_ref() {
1099        missing.extend(collect_missing_blobs(repo, context_root));
1100    }
1101    missing
1102        .into_iter()
1103        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1104}
1105
1106fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1107    let mut missing = Vec::new();
1108    collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1109    missing
1110}
1111
1112fn collect_missing_blobs_recursive(
1113    repo: &Repository,
1114    tree_hash: &ContentHash,
1115    missing: &mut Vec<ContentHash>,
1116) {
1117    let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1118        return;
1119    };
1120    for entry in tree.entries() {
1121        match entry.entry_type {
1122            objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1123                if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1124                    missing.push(entry.hash);
1125                }
1126            }
1127            objects::object::EntryType::Tree => {
1128                collect_missing_blobs_recursive(repo, &entry.hash, missing);
1129            }
1130        }
1131    }
1132}
1133
1134fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1135    match repo.missing_blobs() {
1136        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1137        Ok(_) => PartialFetchStatus::Disabled as i32,
1138        Err(_) => PartialFetchStatus::Unspecified as i32,
1139    }
1140}
1141
1142fn pull_transfer_id(
1143    repo_path: &str,
1144    remote_thread: &str,
1145    local_thread: Option<&str>,
1146    depth: Option<u32>,
1147    target_state: Option<ChangeId>,
1148) -> String {
1149    format!(
1150        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1151        local_thread.unwrap_or_default(),
1152        target_state
1153            .map(|value| value.to_string_full())
1154            .unwrap_or_default()
1155    )
1156}
1157
1158fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1159    format!(
1160        "push:{repo_path}:{}:{target_thread}",
1161        local_state.to_string_full()
1162    )
1163}
1164
1165fn encode_native_pack_messages(
1166    bundle: &proto::NativePackBundle,
1167    transfer_id: &str,
1168    chunk_size: usize,
1169    transport: &super::helpers::HostedTransportPolicy,
1170    transport_mode: TransportMode,
1171) -> Result<Vec<PushMessage>, ProtocolError> {
1172    let mut messages = Vec::new();
1173    let chunk_size = chunk_size.max(1);
1174
1175    let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1176    for chunk_index in 0..pack_total_chunks.max(1) {
1177        let Some((start, len)) =
1178            proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1179        else {
1180            break;
1181        };
1182        messages.push(PushMessage {
1183            body: Some(push_message::Body::Pack(PackChunk {
1184                stream_kind: PackStreamKind::Pack as i32,
1185                data: bundle.pack_data[start..start + len].to_vec(),
1186                transfer: Some(transport.transfer_checkpoint_with_mode(
1187                    transfer_id,
1188                    transport_mode,
1189                    chunk_index as u32,
1190                    start as u64,
1191                    chunk_index + 1 == pack_total_chunks,
1192                )),
1193                chunk_length: len as u32,
1194                is_final_chunk: chunk_index + 1 == pack_total_chunks,
1195            })),
1196        });
1197    }
1198
1199    let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1200    for chunk_index in 0..index_total_chunks.max(1) {
1201        let Some((start, len)) =
1202            proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1203        else {
1204            break;
1205        };
1206        messages.push(PushMessage {
1207            body: Some(push_message::Body::Pack(PackChunk {
1208                stream_kind: PackStreamKind::Index as i32,
1209                data: bundle.index_data[start..start + len].to_vec(),
1210                transfer: Some(transport.transfer_checkpoint_with_mode(
1211                    transfer_id,
1212                    transport_mode,
1213                    chunk_index as u32,
1214                    start as u64,
1215                    chunk_index + 1 == index_total_chunks,
1216                )),
1217                chunk_length: len as u32,
1218                is_final_chunk: chunk_index + 1 == index_total_chunks,
1219            })),
1220        });
1221    }
1222    Ok(messages)
1223}
1224
1225fn preferred_transport_mode(
1226    transport: &super::helpers::HostedTransportPolicy,
1227    object_count: usize,
1228) -> TransportMode {
1229    let _ = transport;
1230    let _ = object_count;
1231    TransportMode::NativePack
1232}