Skip to main content

heddle_client/grpc_hosted/
sync.rs

1use std::{
2    collections::HashMap,
3    time::{Duration, Instant},
4};
5
6use grpc::heddle::v1::{
7    GetBlobRequest, ListRefsRequest, ObjectAvailabilityStatus, ObjectDescriptor, PackChunk,
8    PackStreamKind, PartialFetchStatus, PullMessage, PullRequest, PushMessage, PushRequest,
9    RedactionTransfer, ThreadConfidenceSummary, ThreadIntegrationPolicy, ThreadMetadata,
10    ThreadVerificationSummary, TransportMode, UpdateRefRequest, WantObjects, pull_message,
11    push_message,
12};
13use objects::{
14    object::{ChangeId, ContentHash},
15    store::PackObjectId,
16};
17use proto::{ObjectType, ProtocolError, PullComplete, PushComplete, RefEntry, RefUpdated};
18use repo::{Repository, SyncedThreadMetadata, ThreadManager};
19use tokio::sync::mpsc;
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::Request;
22
23use super::{
24    HostedGrpcClient, PullMaterialization,
25    helpers::{
26        descriptor_id, object_descriptor_with_status, parse_descriptor_to_info,
27        status_to_protocol_error, to_proto_object_info, transport_mode_name,
28    },
29};
30
31#[derive(Clone, Copy)]
32struct PullOptions<'a> {
33    local_thread: Option<&'a str>,
34    depth: Option<u32>,
35    target_state: Option<ChangeId>,
36    materialization: PullMaterialization,
37}
38
39struct PullWantPlan {
40    wants: Vec<ObjectDescriptor>,
41    wanted_types: HashMap<PackObjectId, ObjectType>,
42    want_full_closure: bool,
43}
44
45#[derive(Debug, Clone, Default)]
46pub struct PullObjectMix {
47    pub blobs: usize,
48    pub trees: usize,
49    pub states: usize,
50    pub actions: usize,
51    pub redactions: usize,
52}
53
54impl PullObjectMix {
55    fn record(&mut self, obj_type: ObjectType) {
56        match obj_type {
57            ObjectType::Blob => self.blobs += 1,
58            ObjectType::Tree => self.trees += 1,
59            ObjectType::State => self.states += 1,
60            ObjectType::Action => self.actions += 1,
61            ObjectType::Redaction => self.redactions += 1,
62        }
63    }
64
65    pub fn total(&self) -> usize {
66        self.blobs + self.trees + self.states + self.actions + self.redactions
67    }
68}
69
70#[derive(Debug, Clone, Default)]
71pub struct PullProfile {
72    pub ready_wait: Duration,
73    pub receive_and_apply: Duration,
74    pub decode: Duration,
75    pub store_receive_object: Duration,
76    pub metadata_sync: Duration,
77    pub pack_decode_apply: Duration,
78    pub raw_decode_apply: Duration,
79    pub pack_decode: Duration,
80    pub raw_decode: Duration,
81    pub bytes_received: usize,
82    pub pack_bytes_received: usize,
83    pub raw_bytes_received: usize,
84    pub objects_received: usize,
85    pub object_mix: PullObjectMix,
86}
87
88impl HostedGrpcClient {
89    pub async fn list_refs(&mut self, repo_path: &str) -> Result<Vec<RefEntry>, ProtocolError> {
90        let mut request = Request::new(ListRefsRequest {
91            repo_path: repo_path.to_string(),
92        });
93        self.apply_auth(&mut request)?;
94        let response = self
95            .inner
96            .list_refs(request)
97            .await
98            .map_err(status_to_protocol_error)?
99            .into_inner();
100        response
101            .refs
102            .into_iter()
103            .map(|entry| {
104                Ok(RefEntry {
105                    name: entry.name,
106                    change_id: ChangeId::try_from_slice(&entry.change_id)
107                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
108                    is_thread: entry.is_thread,
109                })
110            })
111            .collect()
112    }
113
114    #[allow(clippy::too_many_arguments)]
115    pub async fn update_ref(
116        &mut self,
117        repo_path: &str,
118        name: &str,
119        is_thread: bool,
120        old_value: Option<ChangeId>,
121        new_value: ChangeId,
122        force: bool,
123        thread_metadata: Option<&SyncedThreadMetadata>,
124    ) -> Result<RefUpdated, ProtocolError> {
125        let mut request = Request::new(UpdateRefRequest {
126            repo_path: repo_path.to_string(),
127            name: name.to_string(),
128            is_thread,
129            force,
130            old_value: old_value
131                .map(|value| value.to_string_full())
132                .unwrap_or_default(),
133            new_value: new_value.to_string_full(),
134            thread_metadata: thread_metadata.map(to_proto_thread_metadata),
135            client_operation_id: String::new(),
136        });
137        self.apply_auth(&mut request)?;
138        let response = self
139            .inner
140            .update_ref(request)
141            .await
142            .map_err(status_to_protocol_error)?
143            .into_inner();
144        Ok(RefUpdated {
145            success: response.success,
146            old_value: if response.old_value.is_empty() {
147                None
148            } else {
149                Some(
150                    ChangeId::parse(&response.old_value)
151                        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
152                )
153            },
154            error: (!response.error.is_empty()).then_some(response.error),
155        })
156    }
157
158    pub async fn push(
159        &mut self,
160        repo: &Repository,
161        repo_path: &str,
162        local_state: ChangeId,
163        target_thread: &str,
164        force: bool,
165    ) -> Result<PushComplete, ProtocolError> {
166        let _ = self.transport.chunk_size;
167        let _ = self.transport.resume_attempts;
168        let _ = self.transport.negotiated.chunk_size();
169        let objects = proto::enumerate_state_closure(repo.store(), local_state)?;
170        let transfer_id = push_transfer_id(repo_path, local_state, target_thread);
171        let transport_mode = preferred_transport_mode(&self.transport, objects.len());
172        let thread_metadata = load_thread_metadata(repo, target_thread, local_state)?;
173        let request_message = PushMessage {
174            body: Some(push_message::Body::Request(PushRequest {
175                repo_path: repo_path.to_string(),
176                local_state: local_state.to_string_full(),
177                target_thread: target_thread.to_string(),
178                create_thread: true,
179                force,
180                objects: objects.iter().map(to_proto_object_info).collect(),
181                transfer: Some(self.transport.transfer_checkpoint_with_mode(
182                    transfer_id.clone(),
183                    transport_mode,
184                    0,
185                    0,
186                    false,
187                )),
188                partial_fetch_status: partial_fetch_status_for_repo(repo),
189                allow_partial_fetch: true,
190                thread_metadata: thread_metadata
191                    .map(|metadata| to_proto_thread_metadata(&metadata)),
192                client_operation_id: String::new(),
193            })),
194        };
195
196        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
197        tx.send(request_message).await.map_err(|_| {
198            ProtocolError::InvalidState("failed to initialize push stream".to_string())
199        })?;
200        let mut request = Request::new(ReceiverStream::new(rx));
201        self.apply_auth(&mut request)?;
202        let mut response = self
203            .inner
204            .push(request)
205            .await
206            .map_err(status_to_protocol_error)?
207            .into_inner();
208
209        let ready = match response.message().await.map_err(status_to_protocol_error)? {
210            Some(PushMessage {
211                body: Some(push_message::Body::Ready(ready)),
212            }) => ready,
213            _ => {
214                return Err(ProtocolError::InvalidState(
215                    "expected PushReady from gRPC server".to_string(),
216                ));
217            }
218        };
219
220        let object_index = objects
221            .into_iter()
222            .map(|info| (descriptor_id(&to_proto_object_info(&info)), info))
223            .collect::<HashMap<_, _>>();
224
225        let ready_transport_mode = ready
226            .transfer
227            .as_ref()
228            .and_then(|transfer| TransportMode::try_from(transfer.transport_mode).ok())
229            .unwrap_or(transport_mode);
230        let wanted_infos = ready
231            .want_objects
232            .into_iter()
233            .map(|want| {
234                object_index
235                    .get(&descriptor_id(&want))
236                    .cloned()
237                    .ok_or_else(|| {
238                        ProtocolError::InvalidState("server requested unknown object".to_string())
239                    })
240            })
241            .collect::<Result<Vec<_>, _>>()?;
242
243        // Split want_objects: blob/tree/state/action → native pack;
244        // redactions → out-of-pack `RedactionTransfer` channel (the
245        // sidecar lives outside `.heddle/objects/` so GC can't reach
246        // it and it can't ride the pack — `build_native_pack` already
247        // skips Redaction entries on the sender side).
248        let (wanted_redactions, wanted_packable): (Vec<_>, Vec<_>) = wanted_infos
249            .into_iter()
250            .partition(|info| info.obj_type == proto::ObjectType::Redaction);
251
252        if !wanted_packable.is_empty() {
253            let bundle = proto::build_native_pack(repo.store(), &wanted_packable)?;
254            for message in encode_native_pack_messages(
255                &bundle,
256                &transfer_id,
257                self.transport.chunk_size.max(1),
258                &self.transport,
259                ready_transport_mode,
260            )? {
261                tx.send(message).await.map_err(|_| {
262                    ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
263                })?;
264            }
265        }
266
267        for info in wanted_redactions {
268            let proto::ObjectId::Hash(blob) = info.id else {
269                return Err(ProtocolError::InvalidState(
270                    "wanted Redaction must be keyed by ObjectId::Hash(content_hash)".to_string(),
271                ));
272            };
273            // Sender-side: load the byte-identical sidecar payload
274            // that `Repository::put_redaction` wrote to disk. The
275            // receiver verifies the signature + trust list and then
276            // persists these bytes verbatim.
277            let bytes = repo
278                .store()
279                .get_redactions_bytes_for_blob(&blob)
280                .map_err(|err| {
281                    ProtocolError::InvalidState(format!(
282                        "load redactions sidecar for {}: {err}",
283                        blob.to_hex()
284                    ))
285                })?
286                .ok_or_else(|| {
287                    ProtocolError::InvalidState(format!(
288                        "server wants redaction for blob {} but sender has no sidecar",
289                        blob.to_hex()
290                    ))
291                })?;
292            let message = PushMessage {
293                body: Some(push_message::Body::Redaction(RedactionTransfer {
294                    blob_hash: blob.to_hex(),
295                    redactions_blob: bytes,
296                })),
297            };
298            tx.send(message).await.map_err(|_| {
299                ProtocolError::InvalidState("push stream closed unexpectedly".to_string())
300            })?;
301        }
302        drop(tx);
303
304        let result = match response.message().await.map_err(status_to_protocol_error)? {
305            Some(PushMessage {
306                body: Some(push_message::Body::Complete(complete)),
307            }) => PushComplete {
308                success: complete.success,
309                new_state: if complete.new_state.is_empty() {
310                    None
311                } else {
312                    Some(
313                        ChangeId::parse(&complete.new_state)
314                            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
315                    )
316                },
317                error: (!complete.error.is_empty()).then_some(complete.error),
318                transfer_id: complete
319                    .transfer
320                    .as_ref()
321                    .map(|transfer| transfer.transfer_id.clone())
322                    .unwrap_or_default(),
323                transport_mode: complete
324                    .transfer
325                    .as_ref()
326                    .map(|transfer| transport_mode_name(transfer.transport_mode))
327                    .unwrap_or("raw-objects")
328                    .to_string(),
329                resume_offset: complete
330                    .transfer
331                    .as_ref()
332                    .map(|transfer| transfer.resume_offset)
333                    .unwrap_or_default(),
334                chunk_index: complete
335                    .transfer
336                    .as_ref()
337                    .map(|transfer| transfer.chunk_index)
338                    .unwrap_or_default(),
339                checkpoint: complete
340                    .transfer
341                    .as_ref()
342                    .map(|transfer| transfer.checkpoint.clone())
343                    .unwrap_or_default(),
344                is_complete: complete
345                    .transfer
346                    .as_ref()
347                    .map(|transfer| transfer.is_complete)
348                    .unwrap_or(false),
349            },
350            _ => {
351                return Err(ProtocolError::InvalidState(
352                    "expected PushComplete from gRPC server".to_string(),
353                ));
354            }
355        };
356
357        if result.success {
358            self.sync_remote_markers(repo, repo_path, local_state)
359                .await?;
360        }
361        Ok(result)
362    }
363
364    pub async fn pull(
365        &mut self,
366        repo: &Repository,
367        repo_path: &str,
368        remote_thread: &str,
369        local_thread: Option<&str>,
370    ) -> Result<PullComplete, ProtocolError> {
371        self.pull_with_options(
372            repo,
373            repo_path,
374            remote_thread,
375            PullOptions {
376                local_thread,
377                depth: None,
378                target_state: None,
379                materialization: PullMaterialization::Full,
380            },
381        )
382        .await
383    }
384
385    pub async fn pull_profiled(
386        &mut self,
387        repo: &Repository,
388        repo_path: &str,
389        remote_thread: &str,
390        local_thread: Option<&str>,
391    ) -> Result<(PullComplete, PullProfile), ProtocolError> {
392        self.pull_exchange(
393            repo,
394            repo_path,
395            remote_thread,
396            PullOptions {
397                local_thread,
398                depth: None,
399                target_state: None,
400                materialization: PullMaterialization::Full,
401            },
402        )
403        .await
404        .map(|exchange| (exchange.result, exchange.profile))
405    }
406
407    pub async fn pull_partial(
408        &mut self,
409        repo: &Repository,
410        repo_path: &str,
411        remote_thread: &str,
412        local_thread: Option<&str>,
413    ) -> Result<PullComplete, ProtocolError> {
414        self.pull_with_options(
415            repo,
416            repo_path,
417            remote_thread,
418            PullOptions {
419                local_thread,
420                depth: None,
421                target_state: None,
422                materialization: PullMaterialization::Lazy,
423            },
424        )
425        .await
426    }
427
428    pub async fn pull_with_depth_and_materialization(
429        &mut self,
430        repo: &Repository,
431        repo_path: &str,
432        remote_thread: &str,
433        local_thread: Option<&str>,
434        depth: Option<u32>,
435        materialization: PullMaterialization,
436    ) -> Result<PullComplete, ProtocolError> {
437        self.pull_with_options(
438            repo,
439            repo_path,
440            remote_thread,
441            PullOptions {
442                local_thread,
443                depth,
444                target_state: None,
445                materialization,
446            },
447        )
448        .await
449    }
450
451    pub async fn pull_with_depth(
452        &mut self,
453        repo: &Repository,
454        repo_path: &str,
455        remote_thread: &str,
456        local_thread: Option<&str>,
457        depth: Option<u32>,
458    ) -> Result<PullComplete, ProtocolError> {
459        self.pull_with_depth_and_materialization(
460            repo,
461            repo_path,
462            remote_thread,
463            local_thread,
464            depth,
465            PullMaterialization::Full,
466        )
467        .await
468    }
469
470    pub async fn fetch_state(
471        &mut self,
472        repo: &Repository,
473        repo_path: &str,
474        remote_thread: &str,
475        target_state: ChangeId,
476    ) -> Result<usize, ProtocolError> {
477        self.pull_exchange(
478            repo,
479            repo_path,
480            remote_thread,
481            PullOptions {
482                local_thread: None,
483                depth: None,
484                target_state: Some(target_state),
485                materialization: PullMaterialization::Full,
486            },
487        )
488        .await
489        .map(|exchange| exchange.object_count)
490    }
491
492    pub async fn fetch_state_partial(
493        &mut self,
494        repo: &Repository,
495        repo_path: &str,
496        remote_thread: &str,
497        target_state: ChangeId,
498    ) -> Result<usize, ProtocolError> {
499        self.pull_exchange(
500            repo,
501            repo_path,
502            remote_thread,
503            PullOptions {
504                local_thread: None,
505                depth: None,
506                target_state: Some(target_state),
507                materialization: PullMaterialization::Lazy,
508            },
509        )
510        .await
511        .map(|exchange| exchange.object_count)
512    }
513
514    pub async fn hydrate_blob_at_path(
515        &mut self,
516        repo: &Repository,
517        repo_path: &str,
518        reference: &str,
519        path: &str,
520    ) -> Result<objects::object::Blob, ProtocolError> {
521        let mut request = Request::new(GetBlobRequest {
522            repo_path: repo_path.to_string(),
523            r#ref: reference.to_string(),
524            path: path.to_string(),
525        });
526        self.apply_auth(&mut request)?;
527        let response = self
528            .content
529            .get_blob(request)
530            .await
531            .map_err(status_to_protocol_error)?
532            .into_inner();
533
534        let content = super::helpers::decode_blob_content(response.content, response.is_binary)?;
535        let blob = objects::object::Blob::new(content);
536        repo.store().put_blob(&blob)?;
537        repo.clear_missing_blob(&blob.hash())?;
538        Ok(blob)
539    }
540
541    pub async fn hydrate_missing_blobs_for_state(
542        &mut self,
543        repo: &Repository,
544        repo_path: &str,
545        remote_thread: &str,
546        target_state: ChangeId,
547    ) -> Result<usize, ProtocolError> {
548        let exchange = self
549            .pull_exchange(
550                repo,
551                repo_path,
552                remote_thread,
553                PullOptions {
554                    local_thread: None,
555                    depth: None,
556                    target_state: Some(target_state),
557                    materialization: PullMaterialization::Full,
558                },
559            )
560            .await?;
561        clear_missing_blobs_for_state(repo, target_state)?;
562        Ok(exchange.object_count)
563    }
564
565    async fn pull_with_options(
566        &mut self,
567        repo: &Repository,
568        repo_path: &str,
569        remote_thread: &str,
570        options: PullOptions<'_>,
571    ) -> Result<PullComplete, ProtocolError> {
572        self.pull_exchange(repo, repo_path, remote_thread, options)
573            .await
574            .map(|exchange| exchange.result)
575    }
576
577    async fn pull_exchange(
578        &mut self,
579        repo: &Repository,
580        repo_path: &str,
581        remote_thread: &str,
582        options: PullOptions<'_>,
583    ) -> Result<PullExchange, ProtocolError> {
584        let exchange_start = Instant::now();
585        let mut exclude_states = Vec::new();
586        if let Some(local_thread) = options.local_thread
587            && let Some(head) = repo.refs().get_thread(local_thread)?
588        {
589            exclude_states.push(head);
590        }
591        let allow_partial_fetch = options.materialization.allows_partial_fetch();
592        let fresh_full_pull =
593            supports_compact_full_pull(repo, allow_partial_fetch, &exclude_states)?;
594
595        let transfer_id = pull_transfer_id(
596            repo_path,
597            remote_thread,
598            options.local_thread,
599            options.depth,
600            options.target_state,
601        );
602        let request_message = PullMessage {
603            body: Some(pull_message::Body::Request(PullRequest {
604                repo_path: repo_path.to_string(),
605                remote_thread: remote_thread.to_string(),
606                local_thread: options.local_thread.unwrap_or_default().to_string(),
607                target_state: options
608                    .target_state
609                    .map(|value| value.to_string_full())
610                    .unwrap_or_default(),
611                depth: options.depth.unwrap_or_default(),
612                exclude_states: exclude_states
613                    .iter()
614                    .map(ChangeId::to_string_full)
615                    .collect(),
616                transfer: Some(self.transport.transfer_checkpoint_with_mode(
617                    transfer_id.clone(),
618                    TransportMode::NativePack,
619                    0,
620                    0,
621                    false,
622                )),
623                partial_fetch_status: partial_fetch_status_for_repo(repo),
624                allow_partial_fetch,
625                fresh_full_pull,
626                client_operation_id: String::new(),
627            })),
628        };
629
630        let (tx, rx) = mpsc::channel(self.transport.max_inflight_objects.max(4));
631        tx.send(request_message).await.map_err(|_| {
632            ProtocolError::InvalidState("failed to initialize pull stream".to_string())
633        })?;
634        let mut request = Request::new(ReceiverStream::new(rx));
635        self.apply_auth(&mut request)?;
636        let mut response = self
637            .inner
638            .pull(request)
639            .await
640            .map_err(status_to_protocol_error)?
641            .into_inner();
642
643        let ready = match response.message().await.map_err(status_to_protocol_error)? {
644            Some(PullMessage {
645                body: Some(pull_message::Body::Ready(ready)),
646            }) => ready,
647            _ => {
648                return Err(ProtocolError::InvalidState(
649                    "expected PullReady from gRPC server".to_string(),
650                ));
651            }
652        };
653        let mut profile = PullProfile {
654            ready_wait: exchange_start.elapsed(),
655            ..PullProfile::default()
656        };
657        let remote_state = ChangeId::parse(&ready.remote_state)
658            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
659        let PullWantPlan {
660            wants,
661            wanted_types,
662            want_full_closure,
663        } = plan_pull_wants(
664            repo,
665            &remote_state,
666            ready.full_closure_available,
667            ready.objects_to_fetch,
668            allow_partial_fetch,
669        )?;
670
671        tx.send(PullMessage {
672            body: Some(pull_message::Body::Want(WantObjects {
673                objects: wants.clone(),
674                want_full_closure,
675                transfer: Some(self.transport.transfer_checkpoint_with_mode(
676                    transfer_id.clone(),
677                    TransportMode::NativePack,
678                    0,
679                    0,
680                    false,
681                )),
682            })),
683        })
684        .await
685        .map_err(|_| ProtocolError::InvalidState("pull stream closed unexpectedly".to_string()))?;
686        drop(tx);
687
688        let receive_start = Instant::now();
689        let mut pack_state = proto::PackChunkState::default();
690        let mut received = 0usize;
691        while let Some(message) = response.message().await.map_err(status_to_protocol_error)? {
692            match message.body {
693                Some(pull_message::Body::Pack(chunk)) => {
694                    profile.bytes_received =
695                        profile.bytes_received.saturating_add(chunk.data.len());
696                    profile.pack_bytes_received =
697                        profile.pack_bytes_received.saturating_add(chunk.data.len());
698                    let transfer = chunk.transfer.as_ref().ok_or_else(|| {
699                        ProtocolError::InvalidState(
700                            "native pack chunk missing transfer checkpoint".to_string(),
701                        )
702                    })?;
703                    let stream_kind = PackStreamKind::try_from(chunk.stream_kind)
704                        .unwrap_or(PackStreamKind::Unspecified);
705                    if stream_kind == PackStreamKind::Unspecified {
706                        return Err(ProtocolError::InvalidState(
707                            "native pack chunk missing stream kind".to_string(),
708                        ));
709                    }
710                    let decode_start = Instant::now();
711                    proto::receive_pack_chunk(
712                        &mut pack_state,
713                        stream_kind == PackStreamKind::Index,
714                        transfer.resume_offset,
715                        transfer.chunk_index,
716                        transfer.is_complete,
717                        &chunk.data,
718                        chunk.is_final_chunk,
719                    )?;
720                    let decode_elapsed = decode_start.elapsed();
721                    profile.pack_decode += decode_elapsed;
722                    profile.pack_decode_apply += decode_elapsed;
723                    profile.decode += decode_elapsed;
724                }
725                Some(pull_message::Body::Redaction(transfer)) => {
726                    // Out-of-pack channel: receive a redaction sidecar
727                    // and route through `Repository::accept_wire_redactions`
728                    // for signature + trust-list verification. The
729                    // server emitted these only for blobs in our want
730                    // set that carry an active redaction.
731                    profile.bytes_received = profile
732                        .bytes_received
733                        .saturating_add(transfer.redactions_blob.len());
734                    profile.object_mix.record(ObjectType::Redaction);
735                    let blob = ContentHash::from_hex(&transfer.blob_hash).map_err(|err| {
736                        ProtocolError::InvalidState(format!(
737                            "RedactionTransfer.blob_hash is not a valid content hash: {err}"
738                        ))
739                    })?;
740                    let decode_start = Instant::now();
741                    repo.accept_wire_redactions(blob, &transfer.redactions_blob)
742                        .map_err(|err| {
743                            ProtocolError::InvalidState(format!(
744                                "accept_wire_redactions for blob {}: {err}",
745                                transfer.blob_hash
746                            ))
747                        })?;
748                    let decode_elapsed = decode_start.elapsed();
749                    profile.store_receive_object += decode_elapsed;
750                }
751                Some(pull_message::Body::Complete(complete)) => {
752                    profile.receive_and_apply = receive_start.elapsed();
753                    let final_state = if complete.new_state.is_empty() {
754                        None
755                    } else {
756                        Some(
757                            ChangeId::parse(&complete.new_state)
758                                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?,
759                        )
760                    };
761
762                    if complete.success {
763                        if want_full_closure || !wants.is_empty() {
764                            if !pack_state.is_complete() {
765                                return Err(ProtocolError::InvalidState(
766                                    "pull completed before native pack stream finished".to_string(),
767                                ));
768                            }
769                            let store_start = Instant::now();
770                            let installed_ids = proto::install_received_pack(
771                                repo.store(),
772                                &pack_state.pack_data,
773                                &pack_state.index_data,
774                            )?;
775                            profile.store_receive_object += store_start.elapsed();
776                            received = installed_ids.len();
777                            for id in installed_ids {
778                                match (id, wanted_types.get(&id).copied()) {
779                                    (PackObjectId::Hash(hash), Some(ObjectType::Blob)) => {
780                                        profile.object_mix.record(ObjectType::Blob);
781                                        repo.clear_missing_blob(&hash)?;
782                                    }
783                                    (_, Some(obj_type)) => {
784                                        profile.object_mix.record(obj_type);
785                                    }
786                                    (PackObjectId::ChangeId(_), None) => {
787                                        profile.object_mix.record(ObjectType::State);
788                                    }
789                                    (PackObjectId::Hash(hash), None) => {
790                                        let inferred =
791                                            infer_installed_hash_object_type(repo, &hash)?;
792                                        profile.object_mix.record(inferred);
793                                    }
794                                }
795                            }
796                        }
797
798                        let metadata_start = Instant::now();
799                        if let Some(local_thread) = options.local_thread
800                            && let Some(state) = final_state
801                        {
802                            repo.refs().set_thread(local_thread, &state)?;
803                        }
804                        if let Some(state) = final_state
805                            && allow_partial_fetch
806                        {
807                            mark_missing_blobs_for_state(repo, state)?;
808                        } else if final_state.is_some() {
809                            let _ = repo.clear_all_missing_blobs()?;
810                        }
811                        let synced_markers = complete
812                            .transfer
813                            .as_ref()
814                            .map(|transfer| apply_marker_snapshot(repo, &transfer.checkpoint))
815                            .transpose()?
816                            .unwrap_or(false);
817                        if !synced_markers {
818                            self.sync_local_markers(repo, repo_path).await?;
819                        }
820                        profile.metadata_sync = metadata_start.elapsed();
821                        profile.objects_received = received;
822                        return Ok(PullExchange {
823                            result: PullComplete {
824                                success: true,
825                                final_state,
826                                error: None,
827                                transfer_id: complete
828                                    .transfer
829                                    .as_ref()
830                                    .map(|transfer| transfer.transfer_id.clone())
831                                    .unwrap_or_default(),
832                                transport_mode: complete
833                                    .transfer
834                                    .as_ref()
835                                    .map(|transfer| transport_mode_name(transfer.transport_mode))
836                                    .unwrap_or("native-pack")
837                                    .to_string(),
838                                resume_offset: complete
839                                    .transfer
840                                    .as_ref()
841                                    .map(|transfer| transfer.resume_offset)
842                                    .unwrap_or_default(),
843                                chunk_index: complete
844                                    .transfer
845                                    .as_ref()
846                                    .map(|transfer| transfer.chunk_index)
847                                    .unwrap_or_default(),
848                                checkpoint: complete
849                                    .transfer
850                                    .as_ref()
851                                    .map(|transfer| transfer.checkpoint.clone())
852                                    .unwrap_or_default(),
853                                is_complete: complete
854                                    .transfer
855                                    .as_ref()
856                                    .map(|transfer| transfer.is_complete)
857                                    .unwrap_or(false),
858                            },
859                            object_count: received,
860                            profile,
861                        });
862                    }
863
864                    profile.objects_received = received;
865                    return Ok(PullExchange {
866                        result: PullComplete {
867                            success: false,
868                            final_state,
869                            error: (!complete.error.is_empty()).then_some(complete.error),
870                            transfer_id: complete
871                                .transfer
872                                .as_ref()
873                                .map(|transfer| transfer.transfer_id.clone())
874                                .unwrap_or_default(),
875                            transport_mode: complete
876                                .transfer
877                                .as_ref()
878                                .map(|transfer| transport_mode_name(transfer.transport_mode))
879                                .unwrap_or("native-pack")
880                                .to_string(),
881                            resume_offset: complete
882                                .transfer
883                                .as_ref()
884                                .map(|transfer| transfer.resume_offset)
885                                .unwrap_or_default(),
886                            chunk_index: complete
887                                .transfer
888                                .as_ref()
889                                .map(|transfer| transfer.chunk_index)
890                                .unwrap_or_default(),
891                            checkpoint: complete
892                                .transfer
893                                .as_ref()
894                                .map(|transfer| transfer.checkpoint.clone())
895                                .unwrap_or_default(),
896                            is_complete: complete
897                                .transfer
898                                .as_ref()
899                                .map(|transfer| transfer.is_complete)
900                                .unwrap_or(false),
901                        },
902                        object_count: received,
903                        profile,
904                    });
905                }
906                _ => {}
907            }
908        }
909
910        Err(ProtocolError::InvalidState(format!(
911            "pull stream ended unexpectedly after receiving {received} packed objects"
912        )))
913    }
914}
915
916fn load_thread_metadata(
917    repo: &Repository,
918    target_thread: &str,
919    local_state: ChangeId,
920) -> Result<Option<SyncedThreadMetadata>, ProtocolError> {
921    let thread_manager = ThreadManager::new(repo.heddle_dir());
922    Ok(thread_manager.find_synced_record_by_thread(repo, target_thread, Some(local_state))?)
923}
924
925fn plan_pull_wants(
926    repo: &Repository,
927    remote_state: &ChangeId,
928    full_closure_available: bool,
929    objects_to_fetch: Vec<ObjectDescriptor>,
930    allow_partial_fetch: bool,
931) -> Result<PullWantPlan, ProtocolError> {
932    if full_closure_available {
933        return Ok(PullWantPlan {
934            wants: Vec::new(),
935            wanted_types: HashMap::new(),
936            want_full_closure: true,
937        });
938    }
939    let request_full_closure =
940        should_request_full_closure(repo, remote_state, allow_partial_fetch)?;
941    let mut wants = Vec::with_capacity(objects_to_fetch.len());
942    let mut wanted_types = HashMap::with_capacity(objects_to_fetch.len());
943
944    for descriptor in objects_to_fetch {
945        let info = parse_descriptor_to_info(descriptor)?;
946        let pack_id = match &info.id {
947            proto::ObjectId::Hash(hash) => PackObjectId::Hash(*hash),
948            proto::ObjectId::ChangeId(change_id) => PackObjectId::ChangeId(*change_id),
949        };
950        let include = if request_full_closure {
951            true
952        } else {
953            let has = proto::has_object(repo.store(), &info)?;
954            !(has || (allow_partial_fetch && matches!(info.obj_type, ObjectType::Blob)))
955        };
956
957        if include {
958            wanted_types.insert(pack_id, info.obj_type);
959            wants.push(object_descriptor_with_status(
960                &info,
961                ObjectAvailabilityStatus::Missing,
962                "requested by client",
963            ));
964        }
965    }
966
967    Ok(PullWantPlan {
968        wants,
969        wanted_types,
970        want_full_closure: false,
971    })
972}
973
974fn supports_compact_full_pull(
975    repo: &Repository,
976    allow_partial_fetch: bool,
977    exclude_states: &[ChangeId],
978) -> Result<bool, ProtocolError> {
979    if allow_partial_fetch || !exclude_states.is_empty() {
980        return Ok(false);
981    }
982    repo_looks_fresh(repo)
983}
984
985fn should_request_full_closure(
986    repo: &Repository,
987    remote_state: &ChangeId,
988    allow_partial_fetch: bool,
989) -> Result<bool, ProtocolError> {
990    if allow_partial_fetch || repo.store().has_state(remote_state)? {
991        return Ok(false);
992    }
993    repo_looks_fresh(repo)
994}
995
996fn repo_looks_fresh(repo: &Repository) -> Result<bool, ProtocolError> {
997    if repo.head()?.is_some() {
998        return Ok(false);
999    }
1000    if !repo.refs().list_threads()?.is_empty() || !repo.refs().list_markers()?.is_empty() {
1001        return Ok(false);
1002    }
1003    Ok(repo.missing_blobs()?.is_empty())
1004}
1005
1006fn infer_installed_hash_object_type(
1007    repo: &Repository,
1008    hash: &ContentHash,
1009) -> Result<ObjectType, ProtocolError> {
1010    let store = repo.store();
1011    if store.get_tree(hash)?.is_some() {
1012        return Ok(ObjectType::Tree);
1013    }
1014    if store
1015        .get_action(&objects::object::ActionId::from_hash(*hash))?
1016        .is_some()
1017    {
1018        return Ok(ObjectType::Action);
1019    }
1020    Ok(ObjectType::Blob)
1021}
1022
1023fn apply_marker_snapshot(repo: &Repository, checkpoint: &[u8]) -> Result<bool, ProtocolError> {
1024    const HEADER: &str = "heddle-markers-v1\n";
1025    if checkpoint.is_empty() {
1026        return Ok(false);
1027    }
1028    let payload = std::str::from_utf8(checkpoint)
1029        .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1030    let Some(lines) = payload.strip_prefix(HEADER) else {
1031        return Ok(false);
1032    };
1033
1034    for line in lines.lines() {
1035        if line.is_empty() {
1036            continue;
1037        }
1038        let Some((name, change_id)) = line.split_once('\t') else {
1039            return Err(ProtocolError::InvalidState(
1040                "invalid marker snapshot line".to_string(),
1041            ));
1042        };
1043        let change_id = ChangeId::parse(change_id)
1044            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
1045        if !repo.store().has_state(&change_id)? {
1046            continue;
1047        }
1048        match repo.refs().get_marker(name)? {
1049            Some(existing) if existing == change_id => {}
1050            Some(existing) => repo.refs().set_marker_cas(
1051                name,
1052                refs::RefExpectation::Value(existing),
1053                &change_id,
1054            )?,
1055            None => repo.refs().create_marker(name, &change_id)?,
1056        }
1057    }
1058
1059    Ok(true)
1060}
1061
1062fn change_id_string_to_bytes(s: &str) -> Vec<u8> {
1063    if s.is_empty() {
1064        return Vec::new();
1065    }
1066    objects::object::ChangeId::parse(s)
1067        .map(|id| id.as_bytes().to_vec())
1068        .unwrap_or_default()
1069}
1070
1071fn to_proto_thread_metadata(metadata: &SyncedThreadMetadata) -> ThreadMetadata {
1072    ThreadMetadata {
1073        name: metadata.thread.clone(),
1074        target_thread: metadata.target_thread.clone(),
1075        parent_thread: metadata.parent_thread.clone(),
1076        task: metadata.task.clone(),
1077        thread_mode: metadata.mode.to_string(),
1078        thread_state: metadata.state.to_string(),
1079        freshness: metadata.freshness.to_string(),
1080        base_state: change_id_string_to_bytes(&metadata.base_state),
1081        base_root: change_id_string_to_bytes(&metadata.base_root),
1082        current_state: metadata
1083            .current_state
1084            .as_deref()
1085            .map(change_id_string_to_bytes),
1086        merged_state: metadata
1087            .merged_state
1088            .as_deref()
1089            .map(change_id_string_to_bytes),
1090        changed_paths: metadata.changed_paths.clone(),
1091        impact_categories: metadata
1092            .impact_categories
1093            .iter()
1094            .map(ToString::to_string)
1095            .collect(),
1096        heavy_impact_paths: metadata.heavy_impact_paths.clone(),
1097        promotion_suggested: metadata.promotion_suggested,
1098        verification_summary: Some(ThreadVerificationSummary {
1099            tests_passed: metadata.verification_summary.tests_passed,
1100            tests_failed: metadata
1101                .verification_summary
1102                .tests_failed
1103                .unwrap_or_default(),
1104            coverage_pct: metadata.verification_summary.coverage_pct,
1105            lint_warnings: metadata.verification_summary.lint_warnings,
1106        }),
1107        confidence_summary: Some(ThreadConfidenceSummary {
1108            value: metadata.confidence_summary.value,
1109            band: metadata
1110                .confidence_summary
1111                .band
1112                .as_ref()
1113                .map(ToString::to_string),
1114        }),
1115        integration_policy_result: Some(ThreadIntegrationPolicy {
1116            status: metadata
1117                .integration_policy_result
1118                .status
1119                .clone()
1120                .unwrap_or_default(),
1121            reason: metadata
1122                .integration_policy_result
1123                .reason
1124                .clone()
1125                .unwrap_or_default(),
1126        }),
1127        created_at: Some(prost_types::Timestamp {
1128            seconds: metadata.created_at.timestamp(),
1129            nanos: metadata.created_at.timestamp_subsec_nanos() as i32,
1130        }),
1131        updated_at: Some(prost_types::Timestamp {
1132            seconds: metadata.updated_at.timestamp(),
1133            nanos: metadata.updated_at.timestamp_subsec_nanos() as i32,
1134        }),
1135    }
1136}
1137
1138struct PullExchange {
1139    result: PullComplete,
1140    object_count: usize,
1141    profile: PullProfile,
1142}
1143
1144fn mark_missing_blobs_for_state(
1145    repo: &Repository,
1146    state_id: ChangeId,
1147) -> Result<(), ProtocolError> {
1148    let state = repo
1149        .store()
1150        .get_state(&state_id)?
1151        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1152    let mut missing = collect_missing_blobs(repo, &state.tree);
1153    if let Some(context_root) = state.context.as_ref() {
1154        missing.extend(collect_missing_blobs(repo, context_root));
1155    }
1156    missing
1157        .into_iter()
1158        .try_for_each(|hash| repo.record_missing_blob(hash).map_err(ProtocolError::from))
1159}
1160
1161fn clear_missing_blobs_for_state(
1162    repo: &Repository,
1163    state_id: ChangeId,
1164) -> Result<(), ProtocolError> {
1165    let state = repo
1166        .store()
1167        .get_state(&state_id)?
1168        .ok_or_else(|| ProtocolError::ObjectNotFound(state_id.to_string_full()))?;
1169    let mut missing = collect_missing_blobs(repo, &state.tree);
1170    if let Some(context_root) = state.context.as_ref() {
1171        missing.extend(collect_missing_blobs(repo, context_root));
1172    }
1173    missing
1174        .into_iter()
1175        .try_for_each(|hash| repo.clear_missing_blob(&hash).map_err(ProtocolError::from))
1176}
1177
1178fn collect_missing_blobs(repo: &Repository, tree_hash: &ContentHash) -> Vec<ContentHash> {
1179    let mut missing = Vec::new();
1180    collect_missing_blobs_recursive(repo, tree_hash, &mut missing);
1181    missing
1182}
1183
1184fn collect_missing_blobs_recursive(
1185    repo: &Repository,
1186    tree_hash: &ContentHash,
1187    missing: &mut Vec<ContentHash>,
1188) {
1189    let Some(tree) = repo.store().get_tree(tree_hash).ok().flatten() else {
1190        return;
1191    };
1192    for entry in tree.entries() {
1193        match entry.entry_type {
1194            objects::object::EntryType::Blob | objects::object::EntryType::Symlink => {
1195                if !repo.store().has_blob(&entry.hash).unwrap_or(true) {
1196                    missing.push(entry.hash);
1197                }
1198            }
1199            objects::object::EntryType::Tree => {
1200                collect_missing_blobs_recursive(repo, &entry.hash, missing);
1201            }
1202        }
1203    }
1204}
1205
1206fn partial_fetch_status_for_repo(repo: &Repository) -> i32 {
1207    match repo.missing_blobs() {
1208        Ok(missing) if !missing.is_empty() => PartialFetchStatus::Enabled as i32,
1209        Ok(_) => PartialFetchStatus::Disabled as i32,
1210        Err(_) => PartialFetchStatus::Unspecified as i32,
1211    }
1212}
1213
1214fn pull_transfer_id(
1215    repo_path: &str,
1216    remote_thread: &str,
1217    local_thread: Option<&str>,
1218    depth: Option<u32>,
1219    target_state: Option<ChangeId>,
1220) -> String {
1221    format!(
1222        "pull:{repo_path}:{remote_thread}:{}:{depth:?}:{}",
1223        local_thread.unwrap_or_default(),
1224        target_state
1225            .map(|value| value.to_string_full())
1226            .unwrap_or_default()
1227    )
1228}
1229
1230fn push_transfer_id(repo_path: &str, local_state: ChangeId, target_thread: &str) -> String {
1231    format!(
1232        "push:{repo_path}:{}:{target_thread}",
1233        local_state.to_string_full()
1234    )
1235}
1236
1237fn encode_native_pack_messages(
1238    bundle: &proto::NativePackBundle,
1239    transfer_id: &str,
1240    chunk_size: usize,
1241    transport: &super::helpers::HostedTransportPolicy,
1242    transport_mode: TransportMode,
1243) -> Result<Vec<PushMessage>, ProtocolError> {
1244    let mut messages = Vec::new();
1245    let chunk_size = chunk_size.max(1);
1246
1247    let pack_total_chunks = proto::chunk_count(bundle.pack_data.len(), chunk_size);
1248    for chunk_index in 0..pack_total_chunks.max(1) {
1249        let Some((start, len)) =
1250            proto::chunk_bounds(bundle.pack_data.len(), chunk_size, chunk_index)
1251        else {
1252            break;
1253        };
1254        messages.push(PushMessage {
1255            body: Some(push_message::Body::Pack(PackChunk {
1256                stream_kind: PackStreamKind::Pack as i32,
1257                data: bundle.pack_data[start..start + len].to_vec(),
1258                transfer: Some(transport.transfer_checkpoint_with_mode(
1259                    transfer_id,
1260                    transport_mode,
1261                    chunk_index as u32,
1262                    start as u64,
1263                    chunk_index + 1 == pack_total_chunks,
1264                )),
1265                chunk_length: len as u32,
1266                is_final_chunk: chunk_index + 1 == pack_total_chunks,
1267            })),
1268        });
1269    }
1270
1271    let index_total_chunks = proto::chunk_count(bundle.index_data.len(), chunk_size);
1272    for chunk_index in 0..index_total_chunks.max(1) {
1273        let Some((start, len)) =
1274            proto::chunk_bounds(bundle.index_data.len(), chunk_size, chunk_index)
1275        else {
1276            break;
1277        };
1278        messages.push(PushMessage {
1279            body: Some(push_message::Body::Pack(PackChunk {
1280                stream_kind: PackStreamKind::Index as i32,
1281                data: bundle.index_data[start..start + len].to_vec(),
1282                transfer: Some(transport.transfer_checkpoint_with_mode(
1283                    transfer_id,
1284                    transport_mode,
1285                    chunk_index as u32,
1286                    start as u64,
1287                    chunk_index + 1 == index_total_chunks,
1288                )),
1289                chunk_length: len as u32,
1290                is_final_chunk: chunk_index + 1 == index_total_chunks,
1291            })),
1292        });
1293    }
1294    Ok(messages)
1295}
1296
1297fn preferred_transport_mode(
1298    transport: &super::helpers::HostedTransportPolicy,
1299    object_count: usize,
1300) -> TransportMode {
1301    let _ = transport;
1302    let _ = object_count;
1303    TransportMode::NativePack
1304}