1use super::{
13 display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14 format::{
15 RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16 STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17 },
18 recorder::{RunRecorder, StoreSizes},
19 retention::{
20 PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21 },
22 run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25 errors::{RunIdResolutionError, RunStoreError},
26 helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27 redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35 collections::{BTreeMap, HashMap, HashSet},
36 fmt,
37 fs::{File, TryLockError},
38 io,
39 num::NonZero,
40 thread,
41 time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54 pub fn new(path: &'a Utf8Path) -> Self {
56 Self(path)
57 }
58
59 pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
61 self.0.join(run_id.to_string())
62 }
63
64 pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
67 StoreRunFiles {
68 run_dir: self.run_dir(run_id),
69 }
70 }
71
72 pub fn as_path(self) -> &'a Utf8Path {
74 self.0
75 }
76}
77
78pub trait RunFilesExist {
82 fn store_zip_exists(&self) -> bool;
84 fn run_log_exists(&self) -> bool;
86}
87
88pub struct StoreRunFiles {
92 run_dir: Utf8PathBuf,
93}
94
95impl RunFilesExist for StoreRunFiles {
96 fn store_zip_exists(&self) -> bool {
97 self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
98 }
99
100 fn run_log_exists(&self) -> bool {
101 self.run_dir.join(RUN_LOG_FILE_NAME).exists()
102 }
103}
104
105#[derive(Debug)]
111pub struct RunStore {
112 runs_dir: Utf8PathBuf,
113}
114
115impl RunStore {
116 pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
120 let runs_dir = store_dir.join("runs");
121 std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
122 run_dir: runs_dir.clone(),
123 error,
124 })?;
125
126 Ok(Self { runs_dir })
127 }
128
129 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
131 StoreRunsDir(&self.runs_dir)
132 }
133
134 pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
142 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
143 let file = std::fs::OpenOptions::new()
144 .create(true)
145 .truncate(false)
146 .write(true)
147 .open(&lock_file_path)
148 .map_err(|error| RunStoreError::FileLock {
149 path: lock_file_path.clone(),
150 error,
151 })?;
152
153 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
154 let result = read_runs_json(&self.runs_dir)?;
155 let run_id_index = RunIdIndex::new(&result.runs);
156
157 Ok(SharedLockedRunStore {
158 runs_dir: StoreRunsDir(&self.runs_dir),
159 locked_file: DebugIgnore(file),
160 runs: result.runs,
161 write_permission: result.write_permission,
162 run_id_index,
163 })
164 }
165
166 pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
174 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
175 let file = std::fs::OpenOptions::new()
176 .create(true)
177 .truncate(false)
178 .write(true)
179 .open(&lock_file_path)
180 .map_err(|error| RunStoreError::FileLock {
181 path: lock_file_path.clone(),
182 error,
183 })?;
184
185 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
186 let result = read_runs_json(&self.runs_dir)?;
187
188 Ok(ExclusiveLockedRunStore {
189 runs_dir: StoreRunsDir(&self.runs_dir),
190 locked_file: DebugIgnore(file),
191 runs: result.runs,
192 last_pruned_at: result.last_pruned_at,
193 write_permission: result.write_permission,
194 })
195 }
196}
197
198#[derive(Debug)]
203pub struct ExclusiveLockedRunStore<'store> {
204 runs_dir: StoreRunsDir<'store>,
205 #[expect(dead_code)]
207 locked_file: DebugIgnore<File>,
208 runs: Vec<RecordedRunInfo>,
209 last_pruned_at: Option<DateTime<Utc>>,
210 write_permission: RunsJsonWritePermission,
211}
212
213impl<'store> ExclusiveLockedRunStore<'store> {
214 pub fn runs_dir(&self) -> StoreRunsDir<'store> {
216 self.runs_dir
217 }
218
219 pub fn write_permission(&self) -> RunsJsonWritePermission {
223 self.write_permission
224 }
225
226 pub fn complete_run(
236 &mut self,
237 run_id: ReportUuid,
238 sizes: StoreSizes,
239 status: RecordedRunStatus,
240 duration_secs: Option<f64>,
241 ) -> Result<bool, RunStoreError> {
242 if let RunsJsonWritePermission::Denied {
243 file_version,
244 max_supported_version,
245 } = self.write_permission
246 {
247 return Err(RunStoreError::FormatVersionTooNew {
248 file_version,
249 max_supported_version,
250 });
251 }
252
253 let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
254 if found {
255 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
256 }
257 Ok(found)
258 }
259
260 fn mark_run_completed_inner(
262 &mut self,
263 run_id: ReportUuid,
264 sizes: StoreSizes,
265 status: RecordedRunStatus,
266 duration_secs: Option<f64>,
267 ) -> bool {
268 for run in &mut self.runs {
269 if run.run_id == run_id {
270 run.sizes = RecordedSizes {
271 log: ComponentSizes {
272 compressed: sizes.log.compressed,
273 uncompressed: sizes.log.uncompressed,
274 entries: sizes.log.entries,
275 },
276 store: ComponentSizes {
277 compressed: sizes.store.compressed,
278 uncompressed: sizes.store.uncompressed,
279 entries: sizes.store.entries,
280 },
281 };
282 run.status = status;
283 run.duration_secs = duration_secs;
284 run.last_written_at = Local::now().fixed_offset();
285 return true;
286 }
287 }
288 false
289 }
290
291 pub fn prune(
308 &mut self,
309 policy: &RecordRetentionPolicy,
310 kind: PruneKind,
311 ) -> Result<PruneResult, RunStoreError> {
312 if let RunsJsonWritePermission::Denied {
313 file_version,
314 max_supported_version,
315 } = self.write_permission
316 {
317 return Err(RunStoreError::FormatVersionTooNew {
318 file_version,
319 max_supported_version,
320 });
321 }
322
323 let now = Utc::now();
324 let to_delete: HashSet<_> = policy
325 .compute_runs_to_delete(&self.runs, now)
326 .into_iter()
327 .collect();
328
329 let runs_dir = self.runs_dir();
330 let mut result = if to_delete.is_empty() {
331 PruneResult::default()
332 } else {
333 delete_runs(runs_dir, &mut self.runs, &to_delete)
334 };
335 result.kind = kind;
336
337 let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
338 delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
339
340 if result.deleted_count > 0 || result.orphans_deleted > 0 {
341 self.last_pruned_at = Some(now);
343 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
344 }
345
346 Ok(result)
347 }
348
349 pub fn prune_if_needed(
359 &mut self,
360 policy: &RecordRetentionPolicy,
361 ) -> Result<Option<PruneResult>, RunStoreError> {
362 const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
363 Some(d) => d,
364 None => panic!("1 day should always be a valid TimeDelta"),
365 };
366 const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
367
368 let now = Utc::now();
369
370 let time_since_last_prune = self
372 .last_pruned_at
373 .map(|last| now.signed_duration_since(last))
374 .unwrap_or(TimeDelta::MAX);
375
376 let should_prune = time_since_last_prune >= PRUNE_INTERVAL
377 || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
378
379 if should_prune {
380 Ok(Some(self.prune(policy, PruneKind::Implicit)?))
381 } else {
382 Ok(None)
383 }
384 }
385
386 #[expect(clippy::too_many_arguments)]
398 pub(crate) fn create_run_recorder(
399 mut self,
400 run_id: ReportUuid,
401 nextest_version: Version,
402 started_at: DateTime<FixedOffset>,
403 cli_args: Vec<String>,
404 build_scope_args: Vec<String>,
405 env_vars: BTreeMap<String, String>,
406 max_output_size: bytesize::ByteSize,
407 parent_run_id: Option<ReportUuid>,
408 ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
409 if let RunsJsonWritePermission::Denied {
410 file_version,
411 max_supported_version,
412 } = self.write_permission
413 {
414 return Err(RunStoreError::FormatVersionTooNew {
415 file_version,
416 max_supported_version,
417 });
418 }
419
420 let now = Local::now().fixed_offset();
426 let run = RecordedRunInfo {
427 run_id,
428 store_format_version: STORE_FORMAT_VERSION,
429 nextest_version,
430 started_at,
431 last_written_at: now,
432 duration_secs: None,
433 cli_args,
434 build_scope_args,
435 env_vars,
436 parent_run_id,
437 sizes: RecordedSizes::default(),
438 status: RecordedRunStatus::Incomplete,
439 };
440 self.runs.push(run);
441
442 if let Some(parent_run_id) = parent_run_id
444 && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
445 {
446 parent_run.last_written_at = now;
447 }
448
449 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
450
451 let index = RunIdIndex::new(&self.runs);
453 let unique_prefix = index
454 .shortest_unique_prefix(run_id)
455 .expect("run was just added to the list");
456
457 let run_dir = self.runs_dir().run_dir(run_id);
462
463 let recorder = RunRecorder::new(run_dir, max_output_size)?;
464 Ok((recorder, unique_prefix))
465 }
466}
467
468#[derive(Clone, Debug)]
470pub struct RecordedRunInfo {
471 pub run_id: ReportUuid,
473 pub store_format_version: StoreFormatVersion,
477 pub nextest_version: Version,
479 pub started_at: DateTime<FixedOffset>,
481 pub last_written_at: DateTime<FixedOffset>,
486 pub duration_secs: Option<f64>,
490 pub cli_args: Vec<String>,
492 pub build_scope_args: Vec<String>,
497 pub env_vars: BTreeMap<String, String>,
499 pub parent_run_id: Option<ReportUuid>,
503 pub sizes: RecordedSizes,
505 pub status: RecordedRunStatus,
507}
508
509#[derive(Clone, Copy, Debug, Default)]
511pub struct RecordedSizes {
512 pub log: ComponentSizes,
514 pub store: ComponentSizes,
516}
517
518#[derive(Clone, Copy, Debug, Default)]
520pub struct ComponentSizes {
521 pub compressed: u64,
523 pub uncompressed: u64,
525 pub entries: u64,
527}
528
529impl RecordedSizes {
530 pub fn total_compressed(&self) -> u64 {
532 self.log.compressed + self.store.compressed
533 }
534
535 pub fn total_uncompressed(&self) -> u64 {
537 self.log.uncompressed + self.store.uncompressed
538 }
539
540 pub fn total_entries(&self) -> u64 {
542 self.log.entries + self.store.entries
543 }
544}
545
546#[derive(Clone, Debug)]
548pub enum RecordedRunStatus {
549 Incomplete,
551 Completed(CompletedRunStats),
553 Cancelled(CompletedRunStats),
555 StressCompleted(StressCompletedRunStats),
557 StressCancelled(StressCompletedRunStats),
559 Unknown,
564}
565
566impl RecordedRunStatus {
567 pub fn short_status_str(&self) -> &'static str {
569 match self {
570 Self::Incomplete => "incomplete",
571 Self::Unknown => "unknown",
572 Self::Completed(_) => "completed",
573 Self::Cancelled(_) => "cancelled",
574 Self::StressCompleted(_) => "stress completed",
575 Self::StressCancelled(_) => "stress cancelled",
576 }
577 }
578
579 pub fn exit_code(&self) -> Option<i32> {
581 match self {
582 Self::Incomplete | Self::Unknown => None,
583 Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
584 Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
585 }
586 }
587}
588
589#[derive(Clone, Copy, Debug)]
591pub struct CompletedRunStats {
592 pub initial_run_count: usize,
594 pub passed: usize,
596 pub failed: usize,
598 pub exit_code: i32,
600}
601
602#[derive(Clone, Copy, Debug)]
604pub struct StressCompletedRunStats {
605 pub initial_iteration_count: Option<NonZero<u32>>,
610 pub success_count: u32,
612 pub failed_count: u32,
614 pub exit_code: i32,
616}
617
618#[derive(Clone, Debug)]
624pub enum ReplayabilityStatus {
625 Replayable,
629 NotReplayable(Vec<NonReplayableReason>),
633 Incomplete,
638}
639
640#[derive(Clone, Debug, PartialEq, Eq)]
642pub enum NonReplayableReason {
643 StoreVersionIncompatible {
647 incompatibility: StoreVersionIncompatibility,
649 },
650 MissingStoreZip,
652 MissingRunLog,
654 StatusUnknown,
658}
659
660impl fmt::Display for NonReplayableReason {
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 match self {
663 Self::StoreVersionIncompatible { incompatibility } => {
664 write!(f, "store format version incompatible: {}", incompatibility)
665 }
666 Self::MissingStoreZip => {
667 write!(f, "store.zip is missing")
668 }
669 Self::MissingRunLog => {
670 write!(f, "run.log.zst is missing")
671 }
672 Self::StatusUnknown => {
673 write!(f, "run status is unknown (from a newer nextest version)")
674 }
675 }
676 }
677}
678
679#[derive(Clone, Copy, Debug)]
681pub struct ResolveRunIdResult {
682 pub run_id: ReportUuid,
684}
685
686impl RecordedRunStatus {
687 pub fn passed_count_width(&self) -> usize {
692 match self {
693 Self::Incomplete | Self::Unknown => 0,
694 Self::Completed(stats) | Self::Cancelled(stats) => {
695 usize_decimal_char_width(stats.passed)
696 }
697 Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
698 u32_decimal_char_width(stats.success_count)
700 }
701 }
702 }
703}
704
705impl RecordedRunInfo {
706 pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
718 let mut blocking = Vec::new();
719 let mut is_incomplete = false;
720
721 if let Err(incompatibility) = self
723 .store_format_version
724 .check_readable_by(STORE_FORMAT_VERSION)
725 {
726 blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
727 }
728
729 if !files.store_zip_exists() {
731 blocking.push(NonReplayableReason::MissingStoreZip);
732 }
733 if !files.run_log_exists() {
734 blocking.push(NonReplayableReason::MissingRunLog);
735 }
736
737 match &self.status {
739 RecordedRunStatus::Unknown => {
740 blocking.push(NonReplayableReason::StatusUnknown);
741 }
742 RecordedRunStatus::Incomplete => {
743 is_incomplete = true;
744 }
745 RecordedRunStatus::Completed(_)
746 | RecordedRunStatus::Cancelled(_)
747 | RecordedRunStatus::StressCompleted(_)
748 | RecordedRunStatus::StressCancelled(_) => {
749 }
751 }
752
753 if !blocking.is_empty() {
755 ReplayabilityStatus::NotReplayable(blocking)
756 } else if is_incomplete {
757 ReplayabilityStatus::Incomplete
758 } else {
759 ReplayabilityStatus::Replayable
760 }
761 }
762
763 pub fn display<'a>(
775 &'a self,
776 run_id_index: &'a RunIdIndex,
777 replayability: &'a ReplayabilityStatus,
778 alignment: RunListAlignment,
779 styles: &'a Styles,
780 redactor: &'a Redactor,
781 ) -> DisplayRecordedRunInfo<'a> {
782 DisplayRecordedRunInfo::new(
783 self,
784 run_id_index,
785 replayability,
786 alignment,
787 styles,
788 redactor,
789 )
790 }
791
792 pub fn display_detailed<'a>(
806 &'a self,
807 run_id_index: &'a RunIdIndex,
808 replayability: &'a ReplayabilityStatus,
809 now: DateTime<Utc>,
810 styles: &'a Styles,
811 theme_characters: &'a ThemeCharacters,
812 redactor: &'a Redactor,
813 ) -> DisplayRecordedRunInfoDetailed<'a> {
814 DisplayRecordedRunInfoDetailed::new(
815 self,
816 run_id_index,
817 replayability,
818 now,
819 styles,
820 theme_characters,
821 redactor,
822 )
823 }
824}
825
826struct ReadRunsJsonResult {
828 runs: Vec<RecordedRunInfo>,
829 last_pruned_at: Option<DateTime<Utc>>,
830 write_permission: RunsJsonWritePermission,
831}
832
833fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
836 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
837 let file = match File::open(&runs_json_path) {
838 Ok(file) => file,
839 Err(error) => {
840 if error.kind() == io::ErrorKind::NotFound {
841 return Ok(ReadRunsJsonResult {
843 runs: Vec::new(),
844 last_pruned_at: None,
845 write_permission: RunsJsonWritePermission::Allowed,
846 });
847 } else {
848 return Err(RunStoreError::RunListRead {
849 path: runs_json_path,
850 error,
851 });
852 }
853 }
854 };
855
856 let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
857 path: runs_json_path.clone(),
858 error,
859 })?;
860
861 let list: RecordedRunList =
862 serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
863 path: runs_json_path,
864 error,
865 })?;
866 let write_permission = list.write_permission();
867 let data = list.into_data();
868 Ok(ReadRunsJsonResult {
869 runs: data.runs,
870 last_pruned_at: data.last_pruned_at,
871 write_permission,
872 })
873}
874
875fn write_runs_json(
877 runs_dir: &Utf8Path,
878 runs: &[RecordedRunInfo],
879 last_pruned_at: Option<DateTime<Utc>>,
880) -> Result<(), RunStoreError> {
881 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
882 let list = RecordedRunList::from_data(runs, last_pruned_at);
883
884 atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
885 .write(|file| {
886 let mut encoder = zstd::stream::Encoder::new(file, 3)?;
888 serde_json::to_writer_pretty(&mut encoder, &list)?;
889 encoder.finish()?;
890 Ok(())
891 })
892 .map_err(|error| RunStoreError::RunListWrite {
893 path: runs_json_path,
894 error,
895 })?;
896
897 Ok(())
898}
899
900#[derive(Debug)]
905pub struct SharedLockedRunStore<'store> {
906 runs_dir: StoreRunsDir<'store>,
907 #[expect(dead_code, reason = "held for lock duration")]
908 locked_file: DebugIgnore<File>,
909 runs: Vec<RecordedRunInfo>,
910 write_permission: RunsJsonWritePermission,
911 run_id_index: RunIdIndex,
912}
913
914impl<'store> SharedLockedRunStore<'store> {
915 pub fn into_snapshot(self) -> RunStoreSnapshot {
918 RunStoreSnapshot {
919 runs_dir: self.runs_dir.as_path().to_owned(),
920 runs: self.runs,
921 write_permission: self.write_permission,
922 run_id_index: self.run_id_index,
923 }
924 }
925}
926
927#[derive(Debug)]
929pub struct RunStoreSnapshot {
930 runs_dir: Utf8PathBuf,
931 runs: Vec<RecordedRunInfo>,
932 write_permission: RunsJsonWritePermission,
933 run_id_index: RunIdIndex,
934}
935
936impl RunStoreSnapshot {
937 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
939 StoreRunsDir(&self.runs_dir)
940 }
941
942 pub fn write_permission(&self) -> RunsJsonWritePermission {
946 self.write_permission
947 }
948
949 pub fn runs(&self) -> &[RecordedRunInfo] {
951 &self.runs
952 }
953
954 pub fn run_count(&self) -> usize {
956 self.runs.len()
957 }
958
959 pub fn total_size(&self) -> u64 {
961 self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
962 }
963
964 pub fn resolve_run_id(
970 &self,
971 selector: &RunIdSelector,
972 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
973 match selector {
974 RunIdSelector::Latest => self.most_recent_run(),
975 RunIdSelector::Prefix(prefix) => {
976 let run_id = self.resolve_run_id_prefix(prefix)?;
977 Ok(ResolveRunIdResult { run_id })
978 }
979 }
980 }
981
982 fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
988 self.run_id_index.resolve_prefix(prefix).map_err(|err| {
989 match err {
990 PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
991 prefix: prefix.to_string(),
992 },
993 PrefixResolutionError::Ambiguous { count, candidates } => {
994 let mut candidates: Vec<_> = candidates
996 .into_iter()
997 .filter_map(|run_id| self.get_run(run_id).cloned())
998 .collect();
999 candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
1000 RunIdResolutionError::Ambiguous {
1001 prefix: prefix.to_string(),
1002 count,
1003 candidates,
1004 run_id_index: self.run_id_index.clone(),
1005 }
1006 }
1007 PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1008 prefix: prefix.to_string(),
1009 },
1010 }
1011 })
1012 }
1013
1014 pub fn run_id_index(&self) -> &RunIdIndex {
1016 &self.run_id_index
1017 }
1018
1019 pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1021 self.runs.iter().find(|r| r.run_id == run_id)
1022 }
1023
1024 pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1028 self.runs
1029 .iter()
1030 .max_by_key(|r| r.started_at)
1031 .map(|r| ResolveRunIdResult { run_id: r.run_id })
1032 .ok_or(RunIdResolutionError::NoRuns)
1033 }
1034
1035 pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1041 PrunePlan::compute(&self.runs, policy)
1042 }
1043}
1044
1045#[derive(Debug)]
1051pub struct SnapshotWithReplayability<'a> {
1052 snapshot: &'a RunStoreSnapshot,
1053 replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1054 latest_run_id: Option<ReportUuid>,
1055}
1056
1057impl<'a> SnapshotWithReplayability<'a> {
1058 pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1063 let runs_dir = snapshot.runs_dir();
1064 let replayability: HashMap<_, _> = snapshot
1065 .runs()
1066 .iter()
1067 .map(|run| {
1068 (
1069 run.run_id,
1070 run.check_replayability(&runs_dir.run_files(run.run_id)),
1071 )
1072 })
1073 .collect();
1074
1075 let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1077
1078 Self {
1079 snapshot,
1080 replayability,
1081 latest_run_id,
1082 }
1083 }
1084
1085 pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1087 self.snapshot
1088 }
1089
1090 pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1092 &self.replayability
1093 }
1094
1095 pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1102 self.replayability
1103 .get(&run_id)
1104 .expect("run ID should be in replayability map")
1105 }
1106
1107 pub fn latest_run_id(&self) -> Option<ReportUuid> {
1109 self.latest_run_id
1110 }
1111}
1112
1113#[cfg(test)]
1114impl SnapshotWithReplayability<'_> {
1115 pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1119 let replayability: HashMap<_, _> = snapshot
1120 .runs()
1121 .iter()
1122 .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1123 .collect();
1124
1125 let latest_run_id = snapshot
1127 .runs()
1128 .iter()
1129 .max_by_key(|r| r.started_at)
1130 .map(|r| r.run_id);
1131
1132 SnapshotWithReplayability {
1133 snapshot,
1134 replayability,
1135 latest_run_id,
1136 }
1137 }
1138}
1139
1140#[cfg(test)]
1141impl RunStoreSnapshot {
1142 pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1144 use super::run_id_index::RunIdIndex;
1145
1146 let run_id_index = RunIdIndex::new(&runs);
1147 Self {
1148 runs_dir: Utf8PathBuf::from("/test/runs"),
1149 runs,
1150 write_permission: RunsJsonWritePermission::Allowed,
1151 run_id_index,
1152 }
1153 }
1154}
1155
1156#[derive(Clone, Copy)]
1158enum LockKind {
1159 Shared,
1160 Exclusive,
1161}
1162
1163fn acquire_lock_with_retry(
1168 file: &File,
1169 lock_file_path: &Utf8Path,
1170 kind: LockKind,
1171) -> Result<(), RunStoreError> {
1172 const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1173 const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1174
1175 let start = Instant::now();
1176 loop {
1177 let result = match kind {
1178 LockKind::Shared => file.try_lock_shared(),
1179 LockKind::Exclusive => file.try_lock(),
1180 };
1181
1182 match result {
1183 Ok(()) => return Ok(()),
1184 Err(TryLockError::WouldBlock) => {
1185 if start.elapsed() >= LOCK_TIMEOUT {
1187 return Err(RunStoreError::FileLockTimeout {
1188 path: lock_file_path.to_owned(),
1189 timeout_secs: LOCK_TIMEOUT.as_secs(),
1190 });
1191 }
1192 thread::sleep(LOCK_RETRY_INTERVAL);
1193 }
1194 Err(TryLockError::Error(error)) => {
1195 return Err(RunStoreError::FileLock {
1197 path: lock_file_path.to_owned(),
1198 error,
1199 });
1200 }
1201 }
1202 }
1203}