Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7    GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9    ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata, ThreadVerificationSummary,
10    TransportMode, UpdateRefRequest, WantObjects, pull_message, push_message,
11};
12use objects::{
13    object::{ChangeId, ContentHash},
14    store::PackObjectId,
15};
16use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
17use repo::{Repository, SyncedThreadMetadata, ThreadManager};
18use tokio::sync::mpsc;
19use tokio_stream::wrappers::ReceiverStream;
20use tonic::Request;
21
22use super::{
23    HostedGrpcClient, PullMaterialization,
24    helpers::{
25        descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
26        status_to_protocol_error, to_proto_object_info, transport_mode_name,
27    },
28};
29
30#[derive(Clone, Copy)]
31struct PullOptions<'a> {
32    local_thread: Option<&'a str>,
33    depth: Option<u32>,
34    target_state: Option<ChangeId>,
35    materialization: PullMaterialization,
36}
37
38struct PullWantPlan {
39    wants: Vec<ObjectDescriptor>,
40    wanted_types: HashMap<PackObjectId, ObjectType>,
41    want_full_closure: bool,
42}
43
44#[derive(Debug, Clone, Default)]
45pub struct PullObjectMix {
46    pub blobs: usize,
47    pub trees: usize,
48    pub states: usize,
49    pub actions: usize,
50}
51
52impl PullObjectMix {
53    fn record(&mut self, obj_type: ObjectType) {
54        match obj_type {
55            ObjectType::Blob => self.blobs += 1,
56            ObjectType::Tree => self.trees += 1,
57            ObjectType::State => self.states += 1,
58            ObjectType::Action => self.actions += 1,
59        }
60    }
61
62    pub fn total(&self) -> usize {
63        self.blobs + self.trees + self.states + self.actions
64    }
65}
66
67#[derive(Debug, Clone, Default)]
68pub struct PullProfile {
69    pub ready_wait: Duration,
70    pub receive_and_apply: Duration,
71    pub decode: Duration,
72    pub store_receive_object: Duration,
73    pub metadata_sync: Duration,
74    pub pack_decode_apply: Duration,
75    pub raw_decode_apply: Duration,
76    pub pack_decode: Duration,
77    pub raw_decode: Duration,
78    pub bytes_received: usize,
79    pub pack_bytes_received: usize,
80    pub raw_bytes_received: usize,
81    pub objects_received: usize,
82    pub object_mix: PullObjectMix,
83}
84
85impl HostedGrpcClient {
86    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
87        let mut request = Request::new(ListRefsRequest {
88            repo_path: repo_path.to_string(),
89        });
90        self.apply_auth(&mut request)?;
91        let response = self
92            .inner
93            .list_refs(request)
94            .await
95            .map_err(status_to_protocol_error)?
96            .into_inner();
97        response
98            .refs
99            .into_iter()
100            .map(|entry| {
101                Ok(RefEntry {
102                    name: entry.name,
103                    change_id: ChangeId::try_from_slice(&entry.change_id)
104                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
105                    is_thread: entry.is_thread,
106                })
107            })
108            .collect()
109    }
110
111    #[allow(clippy::too_many_arguments)]
112    pub async fn update_ref(
113        &mut self,
114        repo_path: &str,
115        name: &str,
116        is_thread: bool,
117        old_value: Option<ChangeId>,
118        new_value: ChangeId,
119        force: bool,
120        thread_metadata: Option<&SyncedThreadMetadata>,
121    ) -> Result<RefUpdated, ProtocolError> {
122        let mut request = Request::new(UpdateRefRequest {
123            repo_path: repo_path.to_string(),
124            name: name.to_string(),
125            is_thread,
126            force,
127            old_value: old_value
128                .map(|value| value.to_string_full())
129                .unwrap_or_default(),
130            new_value: new_value.to_string_full(),
131            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
132            client_operation_id: String::new(),
133        });
134        self.apply_auth(&mut request)?;
135        let response = self
136            .inner
137            .update_ref(request)
138            .await
139            .map_err(status_to_protocol_error)?
140            .into_inner();
141        Ok(RefUpdated {
142            success: response.success,
143            old_value: if response.old_value.is_empty() {
144                None
145            } else {
146                Some(
147                    ChangeId::parse(&response.old_value)
148                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
149                )
150            },
151            error: (!response.error.is_empty()).then_some(response.error),
152        })
153    }
154
155    pub async fn push(
156        &mut self,
157        repo: &Repository,
158        repo_path: &str,
159        local_state: ChangeId,
160        target_thread: &str,
161        force: bool,
162    ) -> Result<PushComplete, ProtocolError> {
163        let _ = self.transport.chunk_size;
164        let _ = self.transport.resume_attempts;
165        let _ = self.transport.negotiated.chunk_size();
166        let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
167        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
168        let transport_mode = preferred_transport_mode(&self.transport, objects.len());
169        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
170        let request_message = PushMessage {
171            body: Some(push_message::Body::Request(PushRequest {
172                repo_path: repo_path.to_string(),
173                local_state: local_state.to_string_full(),
174                target_thread: target_thread.to_string(),
175                create_thread: true,
176                force,
177                objects: objects.iter().map(to_proto_object_info).collect(),
178                transfer: Some(self.transport.transfer_checkpoint_with_mode(
179                    transfer_id.clone(),
180                    transport_mode,
181                    0,
182                    0,
183                    false,
184                )),
185                partial_fetch_status: partial_fetch_status_for_repo(repo),
186                allow_partial_fetch: true,
187                thread_metadata: thread_metadata
188                    .map(|metadata| to_proto_thread_metadata(&metadata)),
189                client_operation_id: String::new(),
190            })),
191        };
192
193        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
194        tx.send(request_message).await.map_err(|_| {
195            ProtocolError::InvalidState("failed to initialize push stream".to_string())
196        })?;
197        let mut request = Request::new(ReceiverStream::new(rx));
198        self.apply_auth(&mut request)?;
199        let mut response = self
200            .inner
201            .push(request)
202            .await
203            .map_err(status_to_protocol_error)?
204            .into_inner();
205
206        let ready = match response.message().await.map_err(status_to_protocol_error)? {
207            Some(PushMessage {
208                body: Some(push_message::Body::Ready(ready)),
209            }) => ready,
210            _ => {
211                return Err(ProtocolError::InvalidState(
212                    "expected PushReady from gRPC server".to_string(),
213                ));
214            }
215        };
216
217        let object_index = objects
218            .into_iter()
219            .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
220            .collect::<HashMap<_, _>>();
221
222        let ready_transport_mode = ready
223            .transfer
224            .as_ref()
225            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
226            .unwrap_or(transport_mode);
227        let wanted_infos = ready
228            .want_objects
229            .into_iter()
230            .map(|want| {
231                object_index
232                    .get(&descriptor_id(&want))
233                    .cloned()
234                    .ok_or_else(|| {
235                        ProtocolError::InvalidState("server requested unknown object".to_string())
236                    })
237            })
238            .collect::<Result<Vec<_>, _>>()?;
239
240        if !wanted_infos.is_empty() {
241            let bundle = proto::build_native_pack(repo.store(), &wanted_infos)?;
242            for message in encode_native_pack_messages(
243                &bundle,
244                &transfer_id,
245                self.transport.chunk_size.max(1),
246                &self.transport,
247                ready_transport_mode,
248            )? {
249                tx.send(message).await.map_err(|_| {
250                    ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
251                })?;
252            }
253        }
254        drop(tx);
255
256        let result = match response.message().await.map_err(status_to_protocol_error)? {
257            Some(PushMessage {
258                body: Some(push_message::Body::Complete(complete)),
259            }) => PushComplete {
260                success: complete.success,
261                new_state: if complete.new_state.is_empty() {
262                    None
263                } else {
264                    Some(
265                        ChangeId::parse(&complete.new_state)
266                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
267                    )
268                },
269                error: (!complete.error.is_empty()).then_some(complete.error),
270                transfer_id: complete
271                    .transfer
272                    .as_ref()
273                    .map(|transfer| transfer.transfer_id.clone())
274                    .unwrap_or_default(),
275                transport_mode: complete
276                    .transfer
277                    .as_ref()
278                    .map(|transfer| transport_mode_name(transfer.transport_mode))
279                    .unwrap_or("raw-objects")
280                    .to_string(),
281                resume_offset: complete
282                    .transfer
283                    .as_ref()
284                    .map(|transfer| transfer.resume_offset)
285                    .unwrap_or_default(),
286                chunk_index: complete
287                    .transfer
288                    .as_ref()
289                    .map(|transfer| transfer.chunk_index)
290                    .unwrap_or_default(),
291                checkpoint: complete
292                    .transfer
293                    .as_ref()
294                    .map(|transfer| transfer.checkpoint.clone())
295                    .unwrap_or_default(),
296                is_complete: complete
297                    .transfer
298                    .as_ref()
299                    .map(|transfer| transfer.is_complete)
300                    .unwrap_or(false),
301            },
302            _ => {
303                return Err(ProtocolError::InvalidState(
304                    "expected PushComplete from gRPC server".to_string(),
305                ));
306            }
307        };
308
309        if result.success {
310            self.sync_remote_markers(repo, repo_path, local_state)
311                .await?;
312        }
313        Ok(result)
314    }
315
316    pub async fn pull(
317        &mut self,
318        repo: &Repository,
319        repo_path: &str,
320        remote_thread: &str,
321        local_thread: Option<&str>,
322    ) -> Result<PullComplete, ProtocolError> {
323        self.pull_with_options(
324            repo,
325            repo_path,
326            remote_thread,
327            PullOptions {
328                local_thread,
329                depth: None,
330                target_state: None,
331                materialization: PullMaterialization::Full,
332            },
333        )
334        .await
335    }
336
337    pub async fn pull_profiled(
338        &mut self,
339        repo: &Repository,
340        repo_path: &str,
341        remote_thread: &str,
342        local_thread: Option<&str>,
343    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
344        self.pull_exchange(
345            repo,
346            repo_path,
347            remote_thread,
348            PullOptions {
349                local_thread,
350                depth: None,
351                target_state: None,
352                materialization: PullMaterialization::Full,
353            },
354        )
355        .await
356        .map(|exchange| (exchange.result, exchange.profile))
357    }
358
359    pub async fn pull_partial(
360        &mut self,
361        repo: &Repository,
362        repo_path: &str,
363        remote_thread: &str,
364        local_thread: Option<&str>,
365    ) -> Result<PullComplete, ProtocolError> {
366        self.pull_with_options(
367            repo,
368            repo_path,
369            remote_thread,
370            PullOptions {
371                local_thread,
372                depth: None,
373                target_state: None,
374                materialization: PullMaterialization::Lazy,
375            },
376        )
377        .await
378    }
379
380    pub async fn pull_with_depth_and_materialization(
381        &mut self,
382        repo: &Repository,
383        repo_path: &str,
384        remote_thread: &str,
385        local_thread: Option<&str>,
386        depth: Option<u32>,
387        materialization: PullMaterialization,
388    ) -> Result<PullComplete, ProtocolError> {
389        self.pull_with_options(
390            repo,
391            repo_path,
392            remote_thread,
393            PullOptions {
394                local_thread,
395                depth,
396                target_state: None,
397                materialization,
398            },
399        )
400        .await
401    }
402
403    pub async fn pull_with_depth(
404        &mut self,
405        repo: &Repository,
406        repo_path: &str,
407        remote_thread: &str,
408        local_thread: Option<&str>,
409        depth: Option<u32>,
410    ) -> Result<PullComplete, ProtocolError> {
411        self.pull_with_depth_and_materialization(
412            repo,
413            repo_path,
414            remote_thread,
415            local_thread,
416            depth,
417            PullMaterialization::Full,
418        )
419        .await
420    }
421
422    pub async fn fetch_state(
423        &mut self,
424        repo: &Repository,
425        repo_path: &str,
426        remote_thread: &str,
427        target_state: ChangeId,
428    ) -> Result<usize, ProtocolError> {
429        self.pull_exchange(
430            repo,
431            repo_path,
432            remote_thread,
433            PullOptions {
434                local_thread: None,
435                depth: None,
436                target_state: Some(target_state),
437                materialization: PullMaterialization::Full,
438            },
439        )
440        .await
441        .map(|exchange| exchange.object_count)
442    }
443
444    pub async fn fetch_state_partial(
445        &mut self,
446        repo: &Repository,
447        repo_path: &str,
448        remote_thread: &str,
449        target_state: ChangeId,
450    ) -> Result<usize, ProtocolError> {
451        self.pull_exchange(
452            repo,
453            repo_path,
454            remote_thread,
455            PullOptions {
456                local_thread: None,
457                depth: None,
458                target_state: Some(target_state),
459                materialization: PullMaterialization::Lazy,
460            },
461        )
462        .await
463        .map(|exchange| exchange.object_count)
464    }
465
466    pub async fn hydrate_blob_at_path(
467        &mut self,
468        repo: &Repository,
469        repo_path: &str,
470        reference: &str,
471        path: &str,
472    ) -> Result<objects::object::Blob, ProtocolError> {
473        let mut request = Request::new(GetBlobRequest {
474            repo_path: repo_path.to_string(),
475            r#ref: reference.to_string(),
476            path: path.to_string(),
477        });
478        self.apply_auth(&mut request)?;
479        let response = self
480            .content
481            .get_blob(request)
482            .await
483            .map_err(status_to_protocol_error)?
484            .into_inner();
485
486        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
487        let blob = objects::object::Blob::new(content);
488        repo.store().put_blob(&blob)?;
489        repo.clear_missing_blob(&blob.hash())?;
490        Ok(blob)
491    }
492
493    pub async fn hydrate_missing_blobs_for_state(
494        &mut self,
495        repo: &Repository,
496        repo_path: &str,
497        remote_thread: &str,
498        target_state: ChangeId,
499    ) -> Result<usize, ProtocolError> {
500        let exchange = self
501            .pull_exchange(
502                repo,
503                repo_path,
504                remote_thread,
505                PullOptions {
506                    local_thread: None,
507                    depth: None,
508                    target_state: Some(target_state),
509                    materialization: PullMaterialization::Full,
510                },
511            )
512            .await?;
513        clear_missing_blobs_for_state(repo, target_state)?;
514        Ok(exchange.object_count)
515    }
516
517    async fn pull_with_options(
518        &mut self,
519        repo: &Repository,
520        repo_path: &str,
521        remote_thread: &str,
522        options: PullOptions<'_>,
523    ) -> Result<PullComplete, ProtocolError> {
524        self.pull_exchange(repo, repo_path, remote_thread, options)
525            .await
526            .map(|exchange| exchange.result)
527    }
528
529    async fn pull_exchange(
530        &mut self,
531        repo: &Repository,
532        repo_path: &str,
533        remote_thread: &str,
534        options: PullOptions<'_>,
535    ) -> Result<PullExchange, ProtocolError> {
536        let exchange_start = Instant::now();
537        let mut exclude_states = Vec::new();
538        if let Some(local_thread) = options.local_thread
539            && let Some(head) = repo.refs().get_thread(local_thread)?
540        {
541            exclude_states.push(head);
542        }
543        let allow_partial_fetch = options.materialization.allows_partial_fetch();
544        let fresh_full_pull =
545            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
546
547        let transfer_id = pull_transfer_id(
548            repo_path,
549            remote_thread,
550            options.local_thread,
551            options.depth,
552            options.target_state,
553        );
554        let request_message = PullMessage {
555            body: Some(pull_message::Body::Request(PullRequest {
556                repo_path: repo_path.to_string(),
557                remote_thread: remote_thread.to_string(),
558                local_thread: options.local_thread.unwrap_or_default().to_string(),
559                target_state: options
560                    .target_state
561                    .map(|value| value.to_string_full())
562                    .unwrap_or_default(),
563                depth: options.depth.unwrap_or_default(),
564                exclude_states: exclude_states
565                    .iter()
566                    .map(ChangeId::to_string_full)
567                    .collect(),
568                transfer: Some(self.transport.transfer_checkpoint_with_mode(
569                    transfer_id.clone(),
570                    TransportMode::NativePack,
571                    0,
572                    0,
573                    false,
574                )),
575                partial_fetch_status: partial_fetch_status_for_repo(repo),
576                allow_partial_fetch,
577                fresh_full_pull,
578                client_operation_id: String::new(),
579            })),
580        };
581
582        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
583        tx.send(request_message).await.map_err(|_| {
584            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
585        })?;
586        let mut request = Request::new(ReceiverStream::new(rx));
587        self.apply_auth(&mut request)?;
588        let mut response = self
589            .inner
590            .pull(request)
591            .await
592            .map_err(status_to_protocol_error)?
593            .into_inner();
594
595        let ready = match response.message().await.map_err(status_to_protocol_error)? {
596            Some(PullMessage {
597                body: Some(pull_message::Body::Ready(ready)),
598            }) => ready,
599            _ => {
600                return Err(ProtocolError::InvalidState(
601                    "expected PullReady from gRPC server".to_string(),
602                ));
603            }
604        };
605        let mut profile = PullProfile {
606            ready_wait: exchange_start.elapsed(),
607            ..PullProfile::default()
608        };
609        let remote_state = ChangeId::parse(&ready.remote_state)
610            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
611        let PullWantPlan {
612            wants,
613            wanted_types,
614            want_full_closure,
615        } = plan_pull_wants(
616            repo,
617            &remote_state,
618            ready.full_closure_available,
619            ready.objects_to_fetch,
620            allow_partial_fetch,
621        )?;
622
623        tx.send(PullMessage {
624            body: Some(pull_message::Body::Want(WantObjects {
625                objects: wants.clone(),
626                want_full_closure,
627                transfer: Some(self.transport.transfer_checkpoint_with_mode(
628                    transfer_id.clone(),
629                    TransportMode::NativePack,
630                    0,
631                    0,
632                    false,
633                )),
634            })),
635        })
636        .await
637        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
638        drop(tx);
639
640        let receive_start = Instant::now();
641        let mut pack_state = proto::PackChunkState::default();
642        let mut received = 0usize;
643        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
644            match message.body {
645                Some(pull_message::Body::Pack(chunk)) => {
646                    profile.bytes_received =
647                        profile.bytes_received.saturating_add(chunk.data.len());
648                    profile.pack_bytes_received =
649                        profile.pack_bytes_received.saturating_add(chunk.data.len());
650                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
651                        ProtocolError::InvalidState(
652                            "native pack chunk missing transfer checkpoint".to_string(),
653                        )
654                    })?;
655                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
656                        .unwrap_or(PackStreamKind::Unspecified);
657                    if stream_kind == PackStreamKind::Unspecified {
658                        return Err(ProtocolError::InvalidState(
659                            "native pack chunk missing stream kind".to_string(),
660                        ));
661                    }
662                    let decode_start = Instant::now();
663                    proto::receive_pack_chunk(
664                        &mut pack_state,
665                        stream_kind == PackStreamKind::Index,
666                        transfer.resume_offset,
667                        transfer.chunk_index,
668                        transfer.is_complete,
669                        &chunk.data,
670                        chunk.is_final_chunk,
671                    )?;
672                    let decode_elapsed = decode_start.elapsed();
673                    profile.pack_decode += decode_elapsed;
674                    profile.pack_decode_apply += decode_elapsed;
675                    profile.decode += decode_elapsed;
676                }
677                Some(pull_message::Body::Complete(complete)) => {
678                    profile.receive_and_apply = receive_start.elapsed();
679                    let final_state = if complete.new_state.is_empty() {
680                        None
681                    } else {
682                        Some(
683                            ChangeId::parse(&complete.new_state)
684                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
685                        )
686                    };
687
688                    if complete.success {
689                        if want_full_closure || !wants.is_empty() {
690                            if !pack_state.is_complete() {
691                                return Err(ProtocolError::InvalidState(
692                                    "pull completed before native pack stream finished".to_string(),
693                                ));
694                            }
695                            let store_start = Instant::now();
696                            let installed_ids = proto::install_received_pack(
697                                repo.store(),
698                                &pack_state.pack_data,
699                                &pack_state.index_data,
700                            )?;
701                            profile.store_receive_object += store_start.elapsed();
702                            received = installed_ids.len();
703                            for id in installed_ids {
704                                match (id, wanted_types.get(&id).copied()) {
705                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
706                                        profile.object_mix.record(ObjectType::Blob);
707                                        repo.clear_missing_blob(&hash)?;
708                                    }
709                                    (_, Some(obj_type)) => {
710                                        profile.object_mix.record(obj_type);
711                                    }
712                                    (PackObjectId::ChangeId(_), None) => {
713                                        profile.object_mix.record(ObjectType::State);
714                                    }
715                                    (PackObjectId::Hash(hash), None) => {
716                                        let inferred =
717                                            infer_installed_hash_object_type(repo, &hash)?;
718                                        profile.object_mix.record(inferred);
719                                    }
720                                }
721                            }
722                        }
723
724                        let metadata_start = Instant::now();
725                        if let Some(local_thread) = options.local_thread
726                            && let Some(state) = final_state
727                        {
728                            repo.refs().set_thread(local_thread, &state)?;
729                        }
730                        if let Some(state) = final_state
731                            && allow_partial_fetch
732                        {
733                            mark_missing_blobs_for_state(repo, state)?;
734                        } else if final_state.is_some() {
735                            let _ = repo.clear_all_missing_blobs()?;
736                        }
737                        let synced_markers = complete
738                            .transfer
739                            .as_ref()
740                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
741                            .transpose()?
742                            .unwrap_or(false);
743                        if !synced_markers {
744                            self.sync_local_markers(repo, repo_path).await?;
745                        }
746                        profile.metadata_sync = metadata_start.elapsed();
747                        profile.objects_received = received;
748                        return Ok(PullExchange {
749                            result: PullComplete {
750                                success: true,
751                                final_state,
752                                error: None,
753                                transfer_id: complete
754                                    .transfer
755                                    .as_ref()
756                                    .map(|transfer| transfer.transfer_id.clone())
757                                    .unwrap_or_default(),
758                                transport_mode: complete
759                                    .transfer
760                                    .as_ref()
761                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
762                                    .unwrap_or("native-pack")
763                                    .to_string(),
764                                resume_offset: complete
765                                    .transfer
766                                    .as_ref()
767                                    .map(|transfer| transfer.resume_offset)
768                                    .unwrap_or_default(),
769                                chunk_index: complete
770                                    .transfer
771                                    .as_ref()
772                                    .map(|transfer| transfer.chunk_index)
773                                    .unwrap_or_default(),
774                                checkpoint: complete
775                                    .transfer
776                                    .as_ref()
777                                    .map(|transfer| transfer.checkpoint.clone())
778                                    .unwrap_or_default(),
779                                is_complete: complete
780                                    .transfer
781                                    .as_ref()
782                                    .map(|transfer| transfer.is_complete)
783                                    .unwrap_or(false),
784                            },
785                            object_count: received,
786                            profile,
787                        });
788                    }
789
790                    profile.objects_received = received;
791                    return Ok(PullExchange {
792                        result: PullComplete {
793                            success: false,
794                            final_state,
795                            error: (!complete.error.is_empty()).then_some(complete.error),
796                            transfer_id: complete
797                                .transfer
798                                .as_ref()
799                                .map(|transfer| transfer.transfer_id.clone())
800                                .unwrap_or_default(),
801                            transport_mode: complete
802                                .transfer
803                                .as_ref()
804                                .map(|transfer| transport_mode_name(transfer.transport_mode))
805                                .unwrap_or("native-pack")
806                                .to_string(),
807                            resume_offset: complete
808                                .transfer
809                                .as_ref()
810                                .map(|transfer| transfer.resume_offset)
811                                .unwrap_or_default(),
812                            chunk_index: complete
813                                .transfer
814                                .as_ref()
815                                .map(|transfer| transfer.chunk_index)
816                                .unwrap_or_default(),
817                            checkpoint: complete
818                                .transfer
819                                .as_ref()
820                                .map(|transfer| transfer.checkpoint.clone())
821                                .unwrap_or_default(),
822                            is_complete: complete
823                                .transfer
824                                .as_ref()
825                                .map(|transfer| transfer.is_complete)
826                                .unwrap_or(false),
827                        },
828                        object_count: received,
829                        profile,
830                    });
831                }
832                _ => {}
833            }
834        }
835
836        Err(ProtocolError::InvalidState(format!(
837            "pull stream ended unexpectedly after receiving {received} packed objects"
838        )))
839    }
840}
841
842fn load_thread_metadata(
843    repo: &Repository,
844    target_thread: &str,
845    local_state: ChangeId,
846) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
847    let thread_manager = ThreadManager::new(repo.heddle_dir());
848    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
849}
850
851fn plan_pull_wants(
852    repo: &Repository,
853    remote_state: &ChangeId,
854    full_closure_available: bool,
855    objects_to_fetch: Vec<ObjectDescriptor>,
856    allow_partial_fetch: bool,
857) -> Result<PullWantPlan, ProtocolError> {
858    if full_closure_available {
859        return Ok(PullWantPlan {
860            wants: Vec::new(),
861            wanted_types: HashMap::new(),
862            want_full_closure: true,
863        });
864    }
865    let request_full_closure =
866        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
867    let mut wants = Vec::with_capacity(objects_to_fetch.len());
868    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
869
870    for descriptor in objects_to_fetch {
871        let info = parse_descriptor_to_info(descriptor)?;
872        let pack_id = match &info.id {
873            proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
874            proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
875        };
876        let include = if request_full_closure {
877            true
878        } else {
879            let has = proto::has_object(repo.store(), &info)?;
880            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
881        };
882
883        if include {
884            wanted_types.insert(pack_id, info.obj_type);
885            wants.push(object_descriptor_with_status(
886                &info,
887                ObjectAvailabilityStatus::Missing,
888                "requested by client",
889            ));
890        }
891    }
892
893    Ok(PullWantPlan {
894        wants,
895        wanted_types,
896        want_full_closure: false,
897    })
898}
899
900fn supports_compact_full_pull(
901    repo: &Repository,
902    allow_partial_fetch: bool,
903    exclude_states: &[ChangeId],
904) -> Result<bool, ProtocolError> {
905    if allow_partial_fetch || !exclude_states.is_empty() {
906        return Ok(false);
907    }
908    repo_looks_fresh(repo)
909}
910
911fn should_request_full_closure(
912    repo: &Repository,
913    remote_state: &ChangeId,
914    allow_partial_fetch: bool,
915) -> Result<bool, ProtocolError> {
916    if allow_partial_fetch || repo.store().has_state(remote_state)? {
917        return Ok(false);
918    }
919    repo_looks_fresh(repo)
920}
921
922fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
923    if repo.head()?.is_some() {
924        return Ok(false);
925    }
926    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
927        return Ok(false);
928    }
929    Ok(repo.missing_blobs()?.is_empty())
930}
931
932fn infer_installed_hash_object_type(
933    repo: &Repository,
934    hash: &ContentHash,
935) -> Result<ObjectType, ProtocolError> {
936    let store = repo.store();
937    if store.get_tree(hash)?.is_some() {
938        return Ok(ObjectType::Tree);
939    }
940    if store
941        .get_action(&objects::object::ActionId::from_hash(*hash))?
942        .is_some()
943    {
944        return Ok(ObjectType::Action);
945    }
946    Ok(ObjectType::Blob)
947}
948
949fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
950    const HEADER: &str = "heddle-markers-v1\n";
951    if checkpoint.is_empty() {
952        return Ok(false);
953    }
954    let payload = std::str::from_utf8(checkpoint)
955        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
956    let Some(lines) = payload.strip_prefix(HEADER) else {
957        return Ok(false);
958    };
959
960    for line in lines.lines() {
961        if line.is_empty() {
962            continue;
963        }
964        let Some((name, change_id)) = line.split_once('\t') else {
965            return Err(ProtocolError::InvalidState(
966                "invalid marker snapshot line".to_string(),
967            ));
968        };
969        let change_id = ChangeId::parse(change_id)
970            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
971        if !repo.store().has_state(&change_id)? {
972            continue;
973        }
974        match repo.refs().get_marker(name)? {
975            Some(existing) if existing == change_id => {}
976            Some(existing) => repo.refs().set_marker_cas(
977                name,
978                refs::RefExpectation::Value(existing),
979                &change_id,
980            )?,
981            None => repo.refs().create_marker(name, &change_id)?,
982        }
983    }
984
985    Ok(true)
986}
987
988fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
989    if s.is_empty() {
990        return Vec::new();
991    }
992    objects::object::ChangeId::parse(s)
993        .map(|id| id.as_bytes().to_vec())
994        .unwrap_or_default()
995}
996
997fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
998    ThreadMetadata {
999        name: metadata.thread.clone(),
1000        target_thread: metadata.target_thread.clone(),
1001        parent_thread: metadata.parent_thread.clone(),
1002        task: metadata.task.clone(),
1003        thread_mode: metadata.mode.to_string(),
1004        thread_state: metadata.state.to_string(),
1005        freshness: metadata.freshness.to_string(),
1006        base_state: change_id_string_to_bytes(&metadata.base_state),
1007        base_root: change_id_string_to_bytes(&metadata.base_root),
1008        current_state: metadata
1009            .current_state
1010            .as_deref()
1011            .map(change_id_string_to_bytes),
1012        merged_state: metadata
1013            .merged_state
1014            .as_deref()
1015            .map(change_id_string_to_bytes),
1016        changed_paths: metadata.changed_paths.clone(),
1017        impact_categories: metadata
1018            .impact_categories
1019            .iter()
1020            .map(ToString::to_string)
1021            .collect(),
1022        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1023        promotion_suggested: metadata.promotion_suggested,
1024        verification_summary: Some(ThreadVerificationSummary {
1025            tests_passed: metadata.verification_summary.tests_passed,
1026            tests_failed: metadata
1027                .verification_summary
1028                .tests_failed
1029                .unwrap_or_default(),
1030            coverage_pct: metadata.verification_summary.coverage_pct,
1031            lint_warnings: metadata.verification_summary.lint_warnings,
1032        }),
1033        confidence_summary: Some(ThreadConfidenceSummary {
1034            value: metadata.confidence_summary.value,
1035            band: metadata
1036                .confidence_summary
1037                .band
1038                .as_ref()
1039                .map(ToString::to_string),
1040        }),
1041        integration_policy_result: Some(ThreadIntegrationPolicy {
1042            status: metadata
1043                .integration_policy_result
1044                .status
1045                .clone()
1046                .unwrap_or_default(),
1047            reason: metadata
1048                .integration_policy_result
1049                .reason
1050                .clone()
1051                .unwrap_or_default(),
1052        }),
1053        created_at: Some(prost_types::Timestamp {
1054            seconds: metadata.created_at.timestamp(),
1055            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1056        }),
1057        updated_at: Some(prost_types::Timestamp {
1058            seconds: metadata.updated_at.timestamp(),
1059            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1060        }),
1061    }
1062}
1063
1064struct PullExchange {
1065    result: PullComplete,
1066    object_count: usize,
1067    profile: PullProfile,
1068}
1069
1070fn mark_missing_blobs_for_state(
1071    repo: &Repository,
1072    state_id: ChangeId,
1073) -> Result<(), ProtocolError> {
1074    let state = repo
1075        .store()
1076        .get_state(&state_id)?
1077        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1078    let mut missing = collect_missing_blobs(repo, &state.tree);
1079    if let Some(context_root) = state.context.as_ref() {
1080        missing.extend(collect_missing_blobs(repo, context_root));
1081    }
1082    missing
1083        .into_iter()
1084        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1085}
1086
1087fn clear_missing_blobs_for_state(
1088    repo: &Repository,
1089    state_id: ChangeId,
1090) -> Result<(), ProtocolError> {
1091    let state = repo
1092        .store()
1093        .get_state(&state_id)?
1094        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1095    let mut missing = collect_missing_blobs(repo, &state.tree);
1096    if let Some(context_root) = state.context.as_ref() {
1097        missing.extend(collect_missing_blobs(repo, context_root));
1098    }
1099    missing
1100        .into_iter()
1101        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1102}
1103
1104fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1105    let mut missing = Vec::new();
1106    collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1107    missing
1108}
1109
1110fn collect_missing_blobs_recursive(
1111    repo: &Repository,
1112    tree_hash: &ContentHash,
1113    missing: &mut Vec<ContentHash>,
1114) {
1115    let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1116        return;
1117    };
1118    for entry in tree.entries() {
1119        match entry.entry_type {
1120            objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1121                if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1122                    missing.push(entry.hash);
1123                }
1124            }
1125            objects::object::EntryType::Tree => {
1126                collect_missing_blobs_recursive(repo, &entry.hash, missing);
1127            }
1128        }
1129    }
1130}
1131
1132fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1133    match repo.missing_blobs() {
1134        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1135        Ok(_) => PartialFetchStatus::Disabled as i32,
1136        Err(_) => PartialFetchStatus::Unspecified as i32,
1137    }
1138}
1139
1140fn pull_transfer_id(
1141    repo_path: &str,
1142    remote_thread: &str,
1143    local_thread: Option<&str>,
1144    depth: Option<u32>,
1145    target_state: Option<ChangeId>,
1146) -> String {
1147    format!(
1148        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1149        local_thread.unwrap_or_default(),
1150        target_state
1151            .map(|value| value.to_string_full())
1152            .unwrap_or_default()
1153    )
1154}
1155
1156fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1157    format!(
1158        "push:{repo_path}:{}:{target_thread}",
1159        local_state.to_string_full()
1160    )
1161}
1162
1163fn encode_native_pack_messages(
1164    bundle: &proto::NativePackBundle,
1165    transfer_id: &str,
1166    chunk_size: usize,
1167    transport: &super::helpers::HostedTransportPolicy,
1168    transport_mode: TransportMode,
1169) -> Result<Vec<PushMessage>, ProtocolError> {
1170    let mut messages = Vec::new();
1171    let chunk_size = chunk_size.max(1);
1172
1173    let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1174    for chunk_index in 0..pack_total_chunks.max(1) {
1175        let Some((start, len)) =
1176            proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1177        else {
1178            break;
1179        };
1180        messages.push(PushMessage {
1181            body: Some(push_message::Body::Pack(PackChunk {
1182                stream_kind: PackStreamKind::Pack as i32,
1183                data: bundle.pack_data[start..start + len].to_vec(),
1184                transfer: Some(transport.transfer_checkpoint_with_mode(
1185                    transfer_id,
1186                    transport_mode,
1187                    chunk_index as u32,
1188                    start as u64,
1189                    chunk_index + 1 == pack_total_chunks,
1190                )),
1191                chunk_length: len as u32,
1192                is_final_chunk: chunk_index + 1 == pack_total_chunks,
1193            })),
1194        });
1195    }
1196
1197    let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1198    for chunk_index in 0..index_total_chunks.max(1) {
1199        let Some((start, len)) =
1200            proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1201        else {
1202            break;
1203        };
1204        messages.push(PushMessage {
1205            body: Some(push_message::Body::Pack(PackChunk {
1206                stream_kind: PackStreamKind::Index as i32,
1207                data: bundle.index_data[start..start + len].to_vec(),
1208                transfer: Some(transport.transfer_checkpoint_with_mode(
1209                    transfer_id,
1210                    transport_mode,
1211                    chunk_index as u32,
1212                    start as u64,
1213                    chunk_index + 1 == index_total_chunks,
1214                )),
1215                chunk_length: len as u32,
1216                is_final_chunk: chunk_index + 1 == index_total_chunks,
1217            })),
1218        });
1219    }
1220    Ok(messages)
1221}
1222
1223fn preferred_transport_mode(
1224    transport: &super::helpers::HostedTransportPolicy,
1225    object_count: usize,
1226) -> TransportMode {
1227    let _ = transport;
1228    let _ = object_count;
1229    TransportMode::NativePack
1230}