Skip to main content

repo/
repository_maintenance.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::{
3    collections::HashSet,
4    fs, io,
5    path::{Path, PathBuf},
6};
7
8use chrono::{DateTime, Utc};
9use objects::{fs_atomic::write_file_atomic, object::ChangeId};
10use proto::{PlannedObject, StateClosureOptions, enumerate_state_closure_plan_with_options};
11use refs::{Head, RefSummaryIndexInspection};
12use serde::{Deserialize, Serialize};
13
14use super::{
15    CommitGraphIndex, Repository, Result, Tree,
16    commit_graph_persistence::{commit_graph_path, load_commit_graph},
17};
18use crate::{FsMonitorSettings, HeddleError, WorktreeIndex, WorktreeStatusOptions};
19
20#[derive(Debug, Clone, Serialize)]
21pub struct RepositoryPerformanceInspectionReport {
22    pub commit_graph: CommitGraphInspection,
23    pub worktree_index: WorktreeIndexInspection,
24    pub change_monitor: ChangeMonitorInspection,
25    #[serde(rename = "refs")]
26    pub ref_counts: RefCountsInspection,
27    pub ref_summary_index: RefSummaryIndexInspection,
28    pub pack_files: PackFilesInspection,
29    pub partial_fetch: PartialFetchInspection,
30    pub pull_planner_cache: PullPlannerCacheInspection,
31}
32
33#[derive(Debug, Clone, Serialize)]
34pub struct CommitGraphInspection {
35    pub present: bool,
36    pub node_count: usize,
37    pub bloom_covered_nodes: usize,
38    pub bytes: u64,
39    pub error: Option<String>,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct WorktreeIndexInspection {
44    pub present: bool,
45    pub file_entries: usize,
46    pub directory_entries: usize,
47    pub untracked_directory_entries: usize,
48    pub snapshot_bytes: u64,
49    pub journal_bytes: u64,
50    pub journal_ops: usize,
51    pub journal_replay_ms: u128,
52    pub error: Option<String>,
53}
54
55#[derive(Debug, Clone, Serialize)]
56pub struct ChangeMonitorInspection {
57    pub backend: String,
58    pub status: String,
59    pub reason: Option<String>,
60    pub changed_path_count: usize,
61}
62
63#[derive(Debug, Clone, Serialize)]
64pub struct RefCountsInspection {
65    pub total: usize,
66    pub threads: usize,
67    pub markers: usize,
68    pub remotes: usize,
69    pub remote_threads: usize,
70    pub packed_refs_present: bool,
71    pub packed_refs_bytes: u64,
72}
73
74#[derive(Debug, Clone, Serialize)]
75pub struct PackFilesInspection {
76    pub pack_count: usize,
77    pub index_count: usize,
78}
79
80#[derive(Debug, Clone, Serialize)]
81pub struct PartialFetchInspection {
82    pub count: usize,
83    pub missing_blob_count: usize,
84}
85
86#[derive(Debug, Clone, Serialize)]
87pub struct PullPlannerCacheInspection {
88    pub status: String,
89    pub present: bool,
90    pub manifest_count: usize,
91    pub planner_entry_count: usize,
92    pub total_bytes: u64,
93}
94
95#[derive(Debug, Clone, Serialize)]
96pub struct RepositoryMaintenanceRunReport {
97    pub rebuilt_commit_graph: bool,
98    pub rebuilt_ref_summary_index: bool,
99    pub rebuilt_worktree_index: bool,
100    pub refreshed_change_monitor: bool,
101    pub rebuilt_pull_planner_cache: bool,
102    pub pruned_pull_planner_entries: usize,
103    pub report: RepositoryPerformanceInspectionReport,
104}
105
106impl Repository {
107    pub fn inspect_performance(&self) -> Result<RepositoryPerformanceInspectionReport> {
108        self.inspect_performance_with_options(&self.maintenance_worktree_status_options())
109    }
110
111    pub fn inspect_performance_with_options(
112        &self,
113        options: &WorktreeStatusOptions,
114    ) -> Result<RepositoryPerformanceInspectionReport> {
115        let change_monitor = self.inspect_change_monitor_with_options(options)?;
116        let threads = self.refs().list_threads()?;
117        let markers = self.refs().list_markers()?;
118        let remotes = self.refs().list_remotes()?;
119        let missing_blobs = self.missing_blobs()?;
120        let remote_threads = remotes.iter().try_fold(0usize, |acc, remote| {
121            Ok::<usize, objects::error::HeddleError>(
122                acc + self.refs().list_remote_threads(remote)?.len(),
123            )
124        })?;
125        let packed_refs_path = self.heddle_dir.join("refs").join("packed-refs");
126        let packed_refs_bytes = file_len_or_zero(&packed_refs_path);
127
128        Ok(RepositoryPerformanceInspectionReport {
129            commit_graph: inspect_commit_graph(self.root()),
130            worktree_index: inspect_worktree_index(self.root()),
131            change_monitor: ChangeMonitorInspection {
132                backend: change_monitor.backend,
133                status: change_monitor.status,
134                reason: change_monitor.reason,
135                changed_path_count: change_monitor.changed_paths.len(),
136            },
137            ref_counts: RefCountsInspection {
138                total: threads.len() + markers.len() + remote_threads,
139                threads: threads.len(),
140                markers: markers.len(),
141                remotes: remotes.len(),
142                remote_threads,
143                packed_refs_present: packed_refs_path.exists(),
144                packed_refs_bytes,
145            },
146            ref_summary_index: self.refs().inspect_ref_summary_index()?,
147            pack_files: inspect_pack_files(&self.heddle_dir),
148            partial_fetch: PartialFetchInspection {
149                count: missing_blobs.len(),
150                missing_blob_count: missing_blobs.len(),
151            },
152            pull_planner_cache: inspect_pull_planner_cache(self.root()),
153        })
154    }
155
156    pub fn run_maintenance(&self) -> Result<RepositoryMaintenanceRunReport> {
157        self.run_maintenance_with_options(&self.maintenance_worktree_status_options())
158    }
159
160    pub fn run_maintenance_with_options(
161        &self,
162        options: &WorktreeStatusOptions,
163    ) -> Result<RepositoryMaintenanceRunReport> {
164        let mut rebuilt_commit_graph = false;
165        let mut rebuilt_worktree_index = false;
166        let refreshed_change_monitor;
167
168        let state_ids = self.store().list_states()?;
169        if !state_ids.is_empty() {
170            let mut graph = CommitGraphIndex::new(self);
171            for state_id in state_ids {
172                graph.ensure_loaded(state_id).map_err(anyhow_to_heddle)?;
173                graph
174                    .ensure_bloom_populated(state_id)
175                    .map_err(anyhow_to_heddle)?;
176            }
177            rebuilt_commit_graph = true;
178        }
179
180        let rebuilt_ref_summary_index = {
181            let ref_summary_index = self.refs().rebuild_ref_summary_index()?;
182            ref_summary_index.present && ref_summary_index.valid
183        };
184        let pull_planner_maintenance = maintain_pull_planner_cache(self)?;
185
186        if let Some(state) = self.current_state()? {
187            let tree = self
188                .store()
189                .get_tree(&state.tree)?
190                .unwrap_or_else(Tree::new);
191            self.compare_worktree_cached_detailed_with_options(&tree, options)?;
192            rebuilt_worktree_index = true;
193            refreshed_change_monitor = true;
194        } else {
195            self.inspect_change_monitor_with_options(options)?;
196            refreshed_change_monitor = true;
197        }
198
199        let report = self.inspect_performance_with_options(options)?;
200        Ok(RepositoryMaintenanceRunReport {
201            rebuilt_commit_graph,
202            rebuilt_ref_summary_index,
203            rebuilt_worktree_index,
204            refreshed_change_monitor,
205            rebuilt_pull_planner_cache: pull_planner_maintenance.rebuilt,
206            pruned_pull_planner_entries: pull_planner_maintenance.pruned_entries,
207            report,
208        })
209    }
210
211    fn maintenance_worktree_status_options(&self) -> WorktreeStatusOptions {
212        WorktreeStatusOptions {
213            fsmonitor: FsMonitorSettings::from(self.config.worktree.fsmonitor),
214        }
215    }
216}
217
218fn inspect_commit_graph(repo_root: &Path) -> CommitGraphInspection {
219    let path = commit_graph_path(repo_root);
220    let bytes = file_len_or_zero(&path);
221    match load_commit_graph(&path) {
222        Ok(Some(nodes)) => CommitGraphInspection {
223            present: true,
224            node_count: nodes.len(),
225            bloom_covered_nodes: nodes.values().filter(|node| node.bloom.is_some()).count(),
226            bytes,
227            error: None,
228        },
229        Ok(None) => CommitGraphInspection {
230            present: false,
231            node_count: 0,
232            bloom_covered_nodes: 0,
233            bytes: 0,
234            error: None,
235        },
236        Err(error) => CommitGraphInspection {
237            present: path.exists(),
238            node_count: 0,
239            bloom_covered_nodes: 0,
240            bytes,
241            error: Some(error.to_string()),
242        },
243    }
244}
245
246fn inspect_worktree_index(repo_root: &Path) -> WorktreeIndexInspection {
247    let index_path = repo_root.join(".heddle/state").join("index.bin");
248    let journal_path = repo_root.join(".heddle/state").join("index.journal");
249    match WorktreeIndex::load_profiled(&index_path) {
250        Ok((index, stats)) => WorktreeIndexInspection {
251            present: true,
252            file_entries: index.len(),
253            directory_entries: index.directory_len(),
254            untracked_directory_entries: index.untracked_directory_len(),
255            snapshot_bytes: stats.snapshot_bytes,
256            journal_bytes: stats.journal_bytes,
257            journal_ops: stats.journal_ops,
258            journal_replay_ms: stats.journal_replay_ms,
259            error: None,
260        },
261        Err(_error) if !index_path.exists() => WorktreeIndexInspection {
262            present: false,
263            file_entries: 0,
264            directory_entries: 0,
265            untracked_directory_entries: 0,
266            snapshot_bytes: 0,
267            journal_bytes: file_len_or_zero(&journal_path),
268            journal_ops: 0,
269            journal_replay_ms: 0,
270            error: None,
271        },
272        Err(error) => WorktreeIndexInspection {
273            present: true,
274            file_entries: 0,
275            directory_entries: 0,
276            untracked_directory_entries: 0,
277            snapshot_bytes: file_len_or_zero(&index_path),
278            journal_bytes: file_len_or_zero(&journal_path),
279            journal_ops: 0,
280            journal_replay_ms: 0,
281            error: Some(error.to_string()),
282        },
283    }
284}
285
286fn inspect_pack_files(heddle_dir: &Path) -> PackFilesInspection {
287    let packs_dir = heddle_dir.join("packs");
288    let mut pack_count = 0usize;
289    let mut index_count = 0usize;
290
291    if let Ok(entries) = fs::read_dir(&packs_dir) {
292        for entry in entries.flatten() {
293            match entry.path().extension().and_then(|ext| ext.to_str()) {
294                Some("pack") => pack_count += 1,
295                Some("idx") => index_count += 1,
296                _ => {}
297            }
298        }
299    }
300
301    PackFilesInspection {
302        pack_count,
303        index_count,
304    }
305}
306
307fn inspect_pull_planner_cache(repo_root: &Path) -> PullPlannerCacheInspection {
308    let pull_root = pull_planner_root(repo_root);
309    let manifest_path = pull_root.join("cold-clone-manifest.json");
310    let plans_dir = pull_root.join("plans");
311
312    let mut manifest_count = 0usize;
313    let mut planner_entry_count = 0usize;
314    let mut total_bytes = 0u64;
315
316    if manifest_path.exists() {
317        manifest_count = 1;
318        total_bytes += file_len_or_zero(&manifest_path);
319    }
320    if let Ok(entries) = fs::read_dir(&plans_dir) {
321        for entry in entries.flatten() {
322            if let Ok(file_type) = entry.file_type()
323                && file_type.is_file()
324            {
325                planner_entry_count += 1;
326                total_bytes += entry.metadata().map(|meta| meta.len()).unwrap_or(0);
327            }
328        }
329    }
330
331    let present = manifest_count > 0 || planner_entry_count > 0;
332    PullPlannerCacheInspection {
333        status: if present {
334            "present".to_string()
335        } else {
336            "absent".to_string()
337        },
338        present,
339        manifest_count,
340        planner_entry_count,
341        total_bytes,
342    }
343}
344
345#[derive(Default)]
346struct PullPlannerMaintenanceRun {
347    rebuilt: bool,
348    pruned_entries: usize,
349}
350
351const PULL_PLANNER_SCHEMA_VERSION: u32 = 1;
352const COLD_CLONE_MANIFEST_SCHEMA_VERSION: u32 = 1;
353
354#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
355enum PullAvailabilityModeMirror {
356    Full,
357    LazyBlobOptional,
358}
359
360impl PullAvailabilityModeMirror {
361    fn as_file_fragment(self) -> &'static str {
362        match self {
363            Self::Full => "full",
364            Self::LazyBlobOptional => "lazy-blob-optional",
365        }
366    }
367}
368
369#[derive(Debug, Clone, Serialize, Deserialize)]
370struct StoredPullPlannerEntryMirror {
371    schema_version: u32,
372    generated_at: DateTime<Utc>,
373    repo_path: String,
374    remote_state_id: String,
375    depth: Option<u32>,
376    exclude_states: Vec<String>,
377    availability_mode: PullAvailabilityModeMirror,
378    object_count: u32,
379    planned_objects: Vec<PlannedObject>,
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
383struct StoredColdCloneManifestMirror {
384    schema_version: u32,
385    generated_at: DateTime<Utc>,
386    repo_path: String,
387    head: HeadSnapshotMirror,
388    markers: Vec<RefSnapshotMirror>,
389    threads: Vec<RefSnapshotMirror>,
390    thread_entries: Vec<ColdCloneThreadEntryMirror>,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
394struct HeadSnapshotMirror {
395    kind: String,
396    value: String,
397    head_state: Option<String>,
398}
399
400#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
401struct RefSnapshotMirror {
402    name: String,
403    state_id: String,
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
407struct ColdCloneThreadEntryMirror {
408    thread: String,
409    state_id: String,
410    planner_key_full: String,
411    planner_key_lazy: String,
412    object_count: u32,
413    full_closure_available: bool,
414}
415
416#[derive(Clone)]
417struct PullPlannerKeyMirror {
418    remote_state_id: ChangeId,
419    depth: Option<u32>,
420    exclude_states: Vec<ChangeId>,
421    availability_mode: PullAvailabilityModeMirror,
422}
423
424impl PullPlannerKeyMirror {
425    fn new(
426        remote_state_id: ChangeId,
427        depth: Option<u32>,
428        exclude_states: Vec<ChangeId>,
429        availability_mode: PullAvailabilityModeMirror,
430    ) -> Self {
431        Self {
432            remote_state_id,
433            depth,
434            exclude_states,
435            availability_mode,
436        }
437    }
438
439    fn file_name(&self) -> String {
440        let depth = self
441            .depth
442            .map(|value| value.to_string())
443            .unwrap_or_else(|| "full".to_string());
444        format!(
445            "{}--depth-{}--exclude-{}--{}.json",
446            self.remote_state_id.to_string_full(),
447            depth,
448            pull_planner_exclude_fingerprint(&self.exclude_states),
449            self.availability_mode.as_file_fragment()
450        )
451    }
452}
453
454fn maintain_pull_planner_cache(repo: &Repository) -> Result<PullPlannerMaintenanceRun> {
455    let pull_root = pull_planner_root(repo.root());
456    if !pull_root.exists() {
457        return Ok(PullPlannerMaintenanceRun::default());
458    }
459
460    let repo_path = discover_pull_planner_repo_path(repo.root())?;
461    let Some(repo_path) = repo_path else {
462        let pruned_entries = prune_invalid_pull_plans(repo, None)?;
463        return Ok(PullPlannerMaintenanceRun {
464            rebuilt: false,
465            pruned_entries,
466        });
467    };
468
469    let pruned_entries = prune_invalid_pull_plans(repo, Some(&repo_path))?;
470    let rebuilt = match load_pull_planner_manifest(repo.root()) {
471        Ok(Some(manifest))
472            if !pull_planner_manifest_needs_rebuild(repo, &repo_path, &manifest)? =>
473        {
474            false
475        }
476        _ => {
477            rebuild_pull_planner_manifest(repo, &repo_path)?;
478            true
479        }
480    };
481
482    Ok(PullPlannerMaintenanceRun {
483        rebuilt,
484        pruned_entries,
485    })
486}
487
488fn discover_pull_planner_repo_path(repo_root: &Path) -> Result<Option<String>> {
489    if let Ok(Some(manifest)) = load_pull_planner_manifest(repo_root) {
490        return Ok(Some(manifest.repo_path));
491    }
492
493    let plans_dir = pull_planner_plans_dir(repo_root);
494    if !plans_dir.exists() {
495        return Ok(None);
496    }
497    for entry in fs::read_dir(plans_dir)? {
498        let entry = entry?;
499        if !entry.file_type()?.is_file() {
500            continue;
501        }
502        let bytes = fs::read(entry.path())?;
503        if let Ok(plan) = serde_json::from_slice::<StoredPullPlannerEntryMirror>(&bytes) {
504            return Ok(Some(plan.repo_path));
505        }
506    }
507    Ok(None)
508}
509
510fn prune_invalid_pull_plans(repo: &Repository, repo_path: Option<&str>) -> Result<usize> {
511    let plans_dir = pull_planner_plans_dir(repo.root());
512    if !plans_dir.exists() {
513        return Ok(0);
514    }
515
516    let valid_states = repo
517        .store()
518        .list_states()?
519        .into_iter()
520        .map(|id| id.to_string_full())
521        .collect::<HashSet<_>>();
522    let mut pruned = 0usize;
523
524    for entry in fs::read_dir(&plans_dir)? {
525        let entry = entry?;
526        if !entry.file_type()?.is_file() {
527            continue;
528        }
529        let path = entry.path();
530        let remove = match fs::read(&path) {
531            Ok(bytes) => match serde_json::from_slice::<StoredPullPlannerEntryMirror>(&bytes) {
532                Ok(plan) => {
533                    plan.schema_version != PULL_PLANNER_SCHEMA_VERSION
534                        || repo_path.is_some_and(|expected| plan.repo_path != expected)
535                        || ChangeId::parse(&plan.remote_state_id).is_err()
536                        || !valid_states.contains(&plan.remote_state_id)
537                }
538                Err(_) => true,
539            },
540            Err(_) => true,
541        };
542        if remove {
543            fs::remove_file(path)?;
544            pruned += 1;
545        }
546    }
547
548    Ok(pruned)
549}
550
551fn pull_planner_manifest_needs_rebuild(
552    repo: &Repository,
553    repo_path: &str,
554    manifest: &StoredColdCloneManifestMirror,
555) -> Result<bool> {
556    let head = repo.refs().read_head()?;
557    let threads = load_ref_snapshots(repo, true)?;
558    let markers = load_ref_snapshots(repo, false)?;
559    if !manifest_matches(manifest, repo_path, &head, &threads, &markers) {
560        return Ok(true);
561    }
562    if manifest.thread_entries.len() != threads.len() {
563        return Ok(true);
564    }
565    let plans_dir = pull_planner_plans_dir(repo.root());
566    for thread in &threads {
567        let Some(entry) = manifest
568            .thread_entries
569            .iter()
570            .find(|entry| entry.thread == thread.name)
571        else {
572            return Ok(true);
573        };
574        let state_id = ChangeId::parse(&thread.state_id).map_err(|err| {
575            HeddleError::Io(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
576        })?;
577        let full_key =
578            PullPlannerKeyMirror::new(state_id, None, Vec::new(), PullAvailabilityModeMirror::Full)
579                .file_name();
580        let lazy_key = PullPlannerKeyMirror::new(
581            state_id,
582            None,
583            Vec::new(),
584            PullAvailabilityModeMirror::LazyBlobOptional,
585        )
586        .file_name();
587        if entry.state_id != thread.state_id
588            || entry.planner_key_full != full_key
589            || entry.planner_key_lazy != lazy_key
590            || !plans_dir.join(&full_key).exists()
591            || !plans_dir.join(&lazy_key).exists()
592        {
593            return Ok(true);
594        }
595    }
596    Ok(false)
597}
598
599fn rebuild_pull_planner_manifest(repo: &Repository, repo_path: &str) -> Result<()> {
600    let head = repo.refs().read_head()?;
601    let threads = load_ref_snapshots(repo, true)?;
602    let markers = load_ref_snapshots(repo, false)?;
603    let plans_dir = pull_planner_plans_dir(repo.root());
604    fs::create_dir_all(&plans_dir)?;
605
606    let mut thread_entries = Vec::with_capacity(threads.len());
607    for thread in &threads {
608        let state_id = ChangeId::parse(&thread.state_id).map_err(|err| {
609            HeddleError::Io(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
610        })?;
611        let full_key =
612            PullPlannerKeyMirror::new(state_id, None, Vec::new(), PullAvailabilityModeMirror::Full);
613        let lazy_key = PullPlannerKeyMirror::new(
614            state_id,
615            None,
616            Vec::new(),
617            PullAvailabilityModeMirror::LazyBlobOptional,
618        );
619        let full_plan = rebuild_pull_planner_entry(repo, repo_path, &full_key)?;
620        rebuild_pull_planner_entry(repo, repo_path, &lazy_key)?;
621        thread_entries.push(ColdCloneThreadEntryMirror {
622            thread: thread.name.clone(),
623            state_id: thread.state_id.clone(),
624            planner_key_full: full_key.file_name(),
625            planner_key_lazy: lazy_key.file_name(),
626            object_count: full_plan.object_count,
627            full_closure_available: true,
628        });
629    }
630
631    let manifest = StoredColdCloneManifestMirror {
632        schema_version: COLD_CLONE_MANIFEST_SCHEMA_VERSION,
633        generated_at: Utc::now(),
634        repo_path: repo_path.to_string(),
635        head: head_snapshot(&head, &threads),
636        markers,
637        threads,
638        thread_entries,
639    };
640    let bytes = serde_json::to_vec_pretty(&manifest).map_err(|err| {
641        HeddleError::Io(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
642    })?;
643    write_file_atomic(&pull_planner_manifest_path(repo.root()), &bytes)?;
644    Ok(())
645}
646
647fn rebuild_pull_planner_entry(
648    repo: &Repository,
649    repo_path: &str,
650    key: &PullPlannerKeyMirror,
651) -> Result<StoredPullPlannerEntryMirror> {
652    let planned_objects = enumerate_state_closure_plan_with_options(
653        repo.store(),
654        key.remote_state_id,
655        StateClosureOptions {
656            depth: key.depth,
657            exclude_states: key.exclude_states.clone(),
658        },
659    )
660    .map_err(|err| HeddleError::Io(io::Error::new(io::ErrorKind::InvalidData, err.to_string())))?;
661    let entry = StoredPullPlannerEntryMirror {
662        schema_version: PULL_PLANNER_SCHEMA_VERSION,
663        generated_at: Utc::now(),
664        repo_path: repo_path.to_string(),
665        remote_state_id: key.remote_state_id.to_string_full(),
666        depth: key.depth,
667        exclude_states: sorted_change_ids(&key.exclude_states),
668        availability_mode: key.availability_mode,
669        object_count: u32::try_from(planned_objects.len()).unwrap_or(u32::MAX),
670        planned_objects,
671    };
672    let bytes = serde_json::to_vec_pretty(&entry).map_err(|err| {
673        HeddleError::Io(io::Error::new(io::ErrorKind::InvalidData, err.to_string()))
674    })?;
675    write_file_atomic(
676        &pull_planner_plans_dir(repo.root()).join(key.file_name()),
677        &bytes,
678    )?;
679    Ok(entry)
680}
681
682fn load_pull_planner_manifest(
683    repo_root: &Path,
684) -> io::Result<Option<StoredColdCloneManifestMirror>> {
685    let path = pull_planner_manifest_path(repo_root);
686    if !path.exists() {
687        return Ok(None);
688    }
689    let bytes = fs::read(path)?;
690    let manifest: StoredColdCloneManifestMirror = serde_json::from_slice(&bytes)
691        .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
692    if manifest.schema_version != COLD_CLONE_MANIFEST_SCHEMA_VERSION {
693        return Err(io::Error::new(
694            io::ErrorKind::InvalidData,
695            format!(
696                "unsupported cold clone manifest schema version {}",
697                manifest.schema_version
698            ),
699        ));
700    }
701    Ok(Some(manifest))
702}
703
704fn pull_planner_root(repo_root: &Path) -> PathBuf {
705    repo_root
706        .join(".heddle/state")
707        .join("derived-summaries")
708        .join("pull")
709}
710
711fn pull_planner_manifest_path(repo_root: &Path) -> PathBuf {
712    pull_planner_root(repo_root).join("cold-clone-manifest.json")
713}
714
715fn pull_planner_plans_dir(repo_root: &Path) -> PathBuf {
716    pull_planner_root(repo_root).join("plans")
717}
718
719fn sorted_change_ids(ids: &[ChangeId]) -> Vec<String> {
720    let mut values = ids.iter().map(ChangeId::to_string_full).collect::<Vec<_>>();
721    values.sort();
722    values
723}
724
725fn pull_planner_exclude_fingerprint(ids: &[ChangeId]) -> String {
726    let joined = sorted_change_ids(ids).join("\n");
727    objects::object::ContentHash::compute(joined.as_bytes())
728        .to_hex()
729        .chars()
730        .take(16)
731        .collect()
732}
733
734fn load_ref_snapshots(repo: &Repository, threads: bool) -> Result<Vec<RefSnapshotMirror>> {
735    let names = if threads {
736        repo.refs().list_threads()?
737    } else {
738        repo.refs().list_markers()?
739    };
740    let mut snapshots = Vec::with_capacity(names.len());
741    for name in names {
742        let state = if threads {
743            repo.refs().get_thread(&name)?
744        } else {
745            repo.refs().get_marker(&name)?
746        }
747        .ok_or_else(|| {
748            HeddleError::Io(io::Error::new(
749                io::ErrorKind::InvalidData,
750                format!(
751                    "{} '{}' disappeared while rebuilding pull planner manifest",
752                    if threads { "thread" } else { "marker" },
753                    name
754                ),
755            ))
756        })?;
757        snapshots.push(RefSnapshotMirror {
758            name,
759            state_id: state.to_string_full(),
760        });
761    }
762    snapshots.sort_by(|a, b| a.name.cmp(&b.name));
763    Ok(snapshots)
764}
765
766fn head_snapshot(head: &Head, threads: &[RefSnapshotMirror]) -> HeadSnapshotMirror {
767    match head {
768        Head::Attached { thread } => HeadSnapshotMirror {
769            kind: "attached".to_string(),
770            value: thread.clone(),
771            head_state: threads
772                .iter()
773                .find(|snapshot| snapshot.name == *thread)
774                .map(|snapshot| snapshot.state_id.clone()),
775        },
776        Head::Detached { state } => HeadSnapshotMirror {
777            kind: "detached".to_string(),
778            value: state.to_string_full(),
779            head_state: Some(state.to_string_full()),
780        },
781    }
782}
783
784fn manifest_matches(
785    manifest: &StoredColdCloneManifestMirror,
786    repo_path: &str,
787    head: &Head,
788    threads: &[RefSnapshotMirror],
789    markers: &[RefSnapshotMirror],
790) -> bool {
791    manifest.repo_path == repo_path
792        && manifest.head == head_snapshot(head, threads)
793        && manifest.threads == threads
794        && manifest.markers == markers
795}
796
797fn file_len_or_zero(path: &Path) -> u64 {
798    fs::metadata(path)
799        .map(|metadata| metadata.len())
800        .unwrap_or(0)
801}
802
803fn anyhow_to_heddle(error: anyhow::Error) -> HeddleError {
804    HeddleError::Io(std::io::Error::other(error.to_string()))
805}