1use super::{
13 display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14 format::{
15 RECORD_FORMAT_VERSION, RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission,
16 STORE_ZIP_FILE_NAME,
17 },
18 recorder::{RunRecorder, StoreSizes},
19 retention::{
20 PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21 },
22 run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25 errors::{RunIdResolutionError, RunStoreError},
26 helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27 redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35 collections::{BTreeMap, HashMap, HashSet},
36 fmt,
37 fs::{File, TryLockError},
38 io,
39 num::NonZero,
40 thread,
41 time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54 pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
56 self.0.join(run_id.to_string())
57 }
58
59 pub fn as_path(self) -> &'a Utf8Path {
61 self.0
62 }
63}
64
65#[derive(Debug)]
71pub struct RunStore {
72 runs_dir: Utf8PathBuf,
73}
74
75impl RunStore {
76 pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
80 let runs_dir = store_dir.join("runs");
81 std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
82 run_dir: runs_dir.clone(),
83 error,
84 })?;
85
86 Ok(Self { runs_dir })
87 }
88
89 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
91 StoreRunsDir(&self.runs_dir)
92 }
93
94 pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
102 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
103 let file = std::fs::OpenOptions::new()
104 .create(true)
105 .truncate(false)
106 .write(true)
107 .open(&lock_file_path)
108 .map_err(|error| RunStoreError::FileLock {
109 path: lock_file_path.clone(),
110 error,
111 })?;
112
113 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
114 let result = read_runs_json(&self.runs_dir)?;
115 let run_id_index = RunIdIndex::new(&result.runs);
116
117 Ok(SharedLockedRunStore {
118 runs_dir: StoreRunsDir(&self.runs_dir),
119 locked_file: DebugIgnore(file),
120 runs: result.runs,
121 write_permission: result.write_permission,
122 run_id_index,
123 })
124 }
125
126 pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
134 let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
135 let file = std::fs::OpenOptions::new()
136 .create(true)
137 .truncate(false)
138 .write(true)
139 .open(&lock_file_path)
140 .map_err(|error| RunStoreError::FileLock {
141 path: lock_file_path.clone(),
142 error,
143 })?;
144
145 acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
146 let result = read_runs_json(&self.runs_dir)?;
147
148 Ok(ExclusiveLockedRunStore {
149 runs_dir: StoreRunsDir(&self.runs_dir),
150 locked_file: DebugIgnore(file),
151 runs: result.runs,
152 last_pruned_at: result.last_pruned_at,
153 write_permission: result.write_permission,
154 })
155 }
156}
157
158#[derive(Debug)]
163pub struct ExclusiveLockedRunStore<'store> {
164 runs_dir: StoreRunsDir<'store>,
165 #[expect(dead_code)]
167 locked_file: DebugIgnore<File>,
168 runs: Vec<RecordedRunInfo>,
169 last_pruned_at: Option<DateTime<Utc>>,
170 write_permission: RunsJsonWritePermission,
171}
172
173impl<'store> ExclusiveLockedRunStore<'store> {
174 pub fn runs_dir(&self) -> StoreRunsDir<'store> {
176 self.runs_dir
177 }
178
179 pub fn write_permission(&self) -> RunsJsonWritePermission {
183 self.write_permission
184 }
185
186 pub fn complete_run(
196 &mut self,
197 run_id: ReportUuid,
198 sizes: StoreSizes,
199 status: RecordedRunStatus,
200 duration_secs: Option<f64>,
201 ) -> Result<bool, RunStoreError> {
202 if let RunsJsonWritePermission::Denied {
203 file_version,
204 max_supported_version,
205 } = self.write_permission
206 {
207 return Err(RunStoreError::FormatVersionTooNew {
208 file_version,
209 max_supported_version,
210 });
211 }
212
213 let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
214 if found {
215 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
216 }
217 Ok(found)
218 }
219
220 fn mark_run_completed_inner(
222 &mut self,
223 run_id: ReportUuid,
224 sizes: StoreSizes,
225 status: RecordedRunStatus,
226 duration_secs: Option<f64>,
227 ) -> bool {
228 for run in &mut self.runs {
229 if run.run_id == run_id {
230 run.sizes = RecordedSizes {
231 log: ComponentSizes {
232 compressed: sizes.log.compressed,
233 uncompressed: sizes.log.uncompressed,
234 entries: sizes.log.entries,
235 },
236 store: ComponentSizes {
237 compressed: sizes.store.compressed,
238 uncompressed: sizes.store.uncompressed,
239 entries: sizes.store.entries,
240 },
241 };
242 run.status = status;
243 run.duration_secs = duration_secs;
244 run.last_written_at = Local::now().fixed_offset();
245 return true;
246 }
247 }
248 false
249 }
250
251 pub fn prune(
268 &mut self,
269 policy: &RecordRetentionPolicy,
270 kind: PruneKind,
271 ) -> Result<PruneResult, RunStoreError> {
272 if let RunsJsonWritePermission::Denied {
273 file_version,
274 max_supported_version,
275 } = self.write_permission
276 {
277 return Err(RunStoreError::FormatVersionTooNew {
278 file_version,
279 max_supported_version,
280 });
281 }
282
283 let now = Utc::now();
284 let to_delete: HashSet<_> = policy
285 .compute_runs_to_delete(&self.runs, now)
286 .into_iter()
287 .collect();
288
289 let runs_dir = self.runs_dir();
290 let mut result = if to_delete.is_empty() {
291 PruneResult::default()
292 } else {
293 delete_runs(runs_dir, &mut self.runs, &to_delete)
294 };
295 result.kind = kind;
296
297 let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
298 delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
299
300 if result.deleted_count > 0 || result.orphans_deleted > 0 {
301 self.last_pruned_at = Some(now);
303 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
304 }
305
306 Ok(result)
307 }
308
309 pub fn prune_if_needed(
319 &mut self,
320 policy: &RecordRetentionPolicy,
321 ) -> Result<Option<PruneResult>, RunStoreError> {
322 const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
323 Some(d) => d,
324 None => panic!("1 day should always be a valid TimeDelta"),
325 };
326 const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
327
328 let now = Utc::now();
329
330 let time_since_last_prune = self
332 .last_pruned_at
333 .map(|last| now.signed_duration_since(last))
334 .unwrap_or(TimeDelta::MAX);
335
336 let should_prune = time_since_last_prune >= PRUNE_INTERVAL
337 || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
338
339 if should_prune {
340 Ok(Some(self.prune(policy, PruneKind::Implicit)?))
341 } else {
342 Ok(None)
343 }
344 }
345
346 #[expect(clippy::too_many_arguments)]
358 pub(crate) fn create_run_recorder(
359 mut self,
360 run_id: ReportUuid,
361 nextest_version: Version,
362 started_at: DateTime<FixedOffset>,
363 cli_args: Vec<String>,
364 build_scope_args: Vec<String>,
365 env_vars: BTreeMap<String, String>,
366 max_output_size: bytesize::ByteSize,
367 parent_run_id: Option<ReportUuid>,
368 ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
369 if let RunsJsonWritePermission::Denied {
370 file_version,
371 max_supported_version,
372 } = self.write_permission
373 {
374 return Err(RunStoreError::FormatVersionTooNew {
375 file_version,
376 max_supported_version,
377 });
378 }
379
380 let now = Local::now().fixed_offset();
386 let run = RecordedRunInfo {
387 run_id,
388 store_format_version: RECORD_FORMAT_VERSION,
389 nextest_version,
390 started_at,
391 last_written_at: now,
392 duration_secs: None,
393 cli_args,
394 build_scope_args,
395 env_vars,
396 parent_run_id,
397 sizes: RecordedSizes::default(),
398 status: RecordedRunStatus::Incomplete,
399 };
400 self.runs.push(run);
401
402 if let Some(parent_run_id) = parent_run_id
404 && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
405 {
406 parent_run.last_written_at = now;
407 }
408
409 write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
410
411 let index = RunIdIndex::new(&self.runs);
413 let unique_prefix = index
414 .shortest_unique_prefix(run_id)
415 .expect("run was just added to the list");
416
417 let run_dir = self.runs_dir().run_dir(run_id);
422
423 let recorder = RunRecorder::new(run_dir, max_output_size)?;
424 Ok((recorder, unique_prefix))
425 }
426}
427
428#[derive(Clone, Debug)]
430pub struct RecordedRunInfo {
431 pub run_id: ReportUuid,
433 pub store_format_version: u32,
437 pub nextest_version: Version,
439 pub started_at: DateTime<FixedOffset>,
441 pub last_written_at: DateTime<FixedOffset>,
446 pub duration_secs: Option<f64>,
450 pub cli_args: Vec<String>,
452 pub build_scope_args: Vec<String>,
457 pub env_vars: BTreeMap<String, String>,
459 pub parent_run_id: Option<ReportUuid>,
463 pub sizes: RecordedSizes,
465 pub status: RecordedRunStatus,
467}
468
469#[derive(Clone, Copy, Debug, Default)]
471pub struct RecordedSizes {
472 pub log: ComponentSizes,
474 pub store: ComponentSizes,
476}
477
478#[derive(Clone, Copy, Debug, Default)]
480pub struct ComponentSizes {
481 pub compressed: u64,
483 pub uncompressed: u64,
485 pub entries: u64,
487}
488
489impl RecordedSizes {
490 pub fn total_compressed(&self) -> u64 {
492 self.log.compressed + self.store.compressed
493 }
494
495 pub fn total_uncompressed(&self) -> u64 {
497 self.log.uncompressed + self.store.uncompressed
498 }
499
500 pub fn total_entries(&self) -> u64 {
502 self.log.entries + self.store.entries
503 }
504}
505
506#[derive(Clone, Debug)]
508pub enum RecordedRunStatus {
509 Incomplete,
511 Completed(CompletedRunStats),
513 Cancelled(CompletedRunStats),
515 StressCompleted(StressCompletedRunStats),
517 StressCancelled(StressCompletedRunStats),
519 Unknown,
524}
525
526impl RecordedRunStatus {
527 pub fn short_status_str(&self) -> &'static str {
529 match self {
530 Self::Incomplete => "incomplete",
531 Self::Unknown => "unknown",
532 Self::Completed(_) => "completed",
533 Self::Cancelled(_) => "cancelled",
534 Self::StressCompleted(_) => "stress completed",
535 Self::StressCancelled(_) => "stress cancelled",
536 }
537 }
538
539 pub fn exit_code(&self) -> Option<i32> {
541 match self {
542 Self::Incomplete | Self::Unknown => None,
543 Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
544 Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
545 }
546 }
547}
548
549#[derive(Clone, Copy, Debug)]
551pub struct CompletedRunStats {
552 pub initial_run_count: usize,
554 pub passed: usize,
556 pub failed: usize,
558 pub exit_code: i32,
560}
561
562#[derive(Clone, Copy, Debug)]
564pub struct StressCompletedRunStats {
565 pub initial_iteration_count: Option<NonZero<u32>>,
570 pub success_count: u32,
572 pub failed_count: u32,
574 pub exit_code: i32,
576}
577
578#[derive(Clone, Debug)]
584pub enum ReplayabilityStatus {
585 Replayable,
589 NotReplayable(Vec<NonReplayableReason>),
593 Incomplete,
598}
599
600#[derive(Clone, Debug, PartialEq, Eq)]
602pub enum NonReplayableReason {
603 StoreFormatTooNew {
607 run_version: u32,
609 max_supported: u32,
611 },
612 MissingStoreZip,
614 MissingRunLog,
616 StatusUnknown,
620}
621
622impl fmt::Display for NonReplayableReason {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 match self {
625 Self::StoreFormatTooNew {
626 run_version,
627 max_supported,
628 } => {
629 write!(
630 f,
631 "store format version {} is newer than supported (version {})",
632 run_version, max_supported
633 )
634 }
635 Self::MissingStoreZip => {
636 write!(f, "store.zip is missing")
637 }
638 Self::MissingRunLog => {
639 write!(f, "run.log.zst is missing")
640 }
641 Self::StatusUnknown => {
642 write!(f, "run status is unknown (from a newer nextest version)")
643 }
644 }
645 }
646}
647
648#[derive(Clone, Copy, Debug)]
650pub struct ResolveRunIdResult {
651 pub run_id: ReportUuid,
653}
654
655impl RecordedRunStatus {
656 pub fn passed_count_width(&self) -> usize {
661 match self {
662 Self::Incomplete | Self::Unknown => 0,
663 Self::Completed(stats) | Self::Cancelled(stats) => {
664 usize_decimal_char_width(stats.passed)
665 }
666 Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
667 u32_decimal_char_width(stats.success_count)
669 }
670 }
671 }
672}
673
674impl RecordedRunInfo {
675 pub fn check_replayability(&self, runs_dir: StoreRunsDir<'_>) -> ReplayabilityStatus {
685 let mut blocking = Vec::new();
686 let mut is_incomplete = false;
687
688 if self.store_format_version > RECORD_FORMAT_VERSION {
690 blocking.push(NonReplayableReason::StoreFormatTooNew {
691 run_version: self.store_format_version,
692 max_supported: RECORD_FORMAT_VERSION,
693 });
694 }
695 let run_dir = runs_dir.run_dir(self.run_id);
700 let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
701 let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
702
703 if !store_zip_path.exists() {
704 blocking.push(NonReplayableReason::MissingStoreZip);
705 }
706 if !run_log_path.exists() {
707 blocking.push(NonReplayableReason::MissingRunLog);
708 }
709
710 match &self.status {
712 RecordedRunStatus::Unknown => {
713 blocking.push(NonReplayableReason::StatusUnknown);
714 }
715 RecordedRunStatus::Incomplete => {
716 is_incomplete = true;
717 }
718 RecordedRunStatus::Completed(_)
719 | RecordedRunStatus::Cancelled(_)
720 | RecordedRunStatus::StressCompleted(_)
721 | RecordedRunStatus::StressCancelled(_) => {
722 }
724 }
725
726 if !blocking.is_empty() {
728 ReplayabilityStatus::NotReplayable(blocking)
729 } else if is_incomplete {
730 ReplayabilityStatus::Incomplete
731 } else {
732 ReplayabilityStatus::Replayable
733 }
734 }
735
736 pub fn display<'a>(
748 &'a self,
749 run_id_index: &'a RunIdIndex,
750 replayability: &'a ReplayabilityStatus,
751 alignment: RunListAlignment,
752 styles: &'a Styles,
753 redactor: &'a Redactor,
754 ) -> DisplayRecordedRunInfo<'a> {
755 DisplayRecordedRunInfo::new(
756 self,
757 run_id_index,
758 replayability,
759 alignment,
760 styles,
761 redactor,
762 )
763 }
764
765 pub fn display_detailed<'a>(
779 &'a self,
780 run_id_index: &'a RunIdIndex,
781 replayability: &'a ReplayabilityStatus,
782 now: DateTime<Utc>,
783 styles: &'a Styles,
784 theme_characters: &'a ThemeCharacters,
785 redactor: &'a Redactor,
786 ) -> DisplayRecordedRunInfoDetailed<'a> {
787 DisplayRecordedRunInfoDetailed::new(
788 self,
789 run_id_index,
790 replayability,
791 now,
792 styles,
793 theme_characters,
794 redactor,
795 )
796 }
797}
798
799struct ReadRunsJsonResult {
801 runs: Vec<RecordedRunInfo>,
802 last_pruned_at: Option<DateTime<Utc>>,
803 write_permission: RunsJsonWritePermission,
804}
805
806fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
809 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
810 let file = match File::open(&runs_json_path) {
811 Ok(file) => file,
812 Err(error) => {
813 if error.kind() == io::ErrorKind::NotFound {
814 return Ok(ReadRunsJsonResult {
816 runs: Vec::new(),
817 last_pruned_at: None,
818 write_permission: RunsJsonWritePermission::Allowed,
819 });
820 } else {
821 return Err(RunStoreError::RunListRead {
822 path: runs_json_path,
823 error,
824 });
825 }
826 }
827 };
828
829 let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
830 path: runs_json_path.clone(),
831 error,
832 })?;
833
834 let list: RecordedRunList =
835 serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
836 path: runs_json_path,
837 error,
838 })?;
839 let write_permission = list.write_permission();
840 let data = list.into_data();
841 Ok(ReadRunsJsonResult {
842 runs: data.runs,
843 last_pruned_at: data.last_pruned_at,
844 write_permission,
845 })
846}
847
848fn write_runs_json(
850 runs_dir: &Utf8Path,
851 runs: &[RecordedRunInfo],
852 last_pruned_at: Option<DateTime<Utc>>,
853) -> Result<(), RunStoreError> {
854 let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
855 let list = RecordedRunList::from_data(runs, last_pruned_at);
856
857 atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
858 .write(|file| {
859 let mut encoder = zstd::stream::Encoder::new(file, 3)?;
861 serde_json::to_writer_pretty(&mut encoder, &list)?;
862 encoder.finish()?;
863 Ok(())
864 })
865 .map_err(|error| RunStoreError::RunListWrite {
866 path: runs_json_path,
867 error,
868 })?;
869
870 Ok(())
871}
872
873#[derive(Debug)]
878pub struct SharedLockedRunStore<'store> {
879 runs_dir: StoreRunsDir<'store>,
880 #[expect(dead_code, reason = "held for lock duration")]
881 locked_file: DebugIgnore<File>,
882 runs: Vec<RecordedRunInfo>,
883 write_permission: RunsJsonWritePermission,
884 run_id_index: RunIdIndex,
885}
886
887impl<'store> SharedLockedRunStore<'store> {
888 pub fn into_snapshot(self) -> RunStoreSnapshot {
891 RunStoreSnapshot {
892 runs_dir: self.runs_dir.as_path().to_owned(),
893 runs: self.runs,
894 write_permission: self.write_permission,
895 run_id_index: self.run_id_index,
896 }
897 }
898}
899
900#[derive(Debug)]
902pub struct RunStoreSnapshot {
903 runs_dir: Utf8PathBuf,
904 runs: Vec<RecordedRunInfo>,
905 write_permission: RunsJsonWritePermission,
906 run_id_index: RunIdIndex,
907}
908
909impl RunStoreSnapshot {
910 pub fn runs_dir(&self) -> StoreRunsDir<'_> {
912 StoreRunsDir(&self.runs_dir)
913 }
914
915 pub fn write_permission(&self) -> RunsJsonWritePermission {
919 self.write_permission
920 }
921
922 pub fn runs(&self) -> &[RecordedRunInfo] {
924 &self.runs
925 }
926
927 pub fn run_count(&self) -> usize {
929 self.runs.len()
930 }
931
932 pub fn total_size(&self) -> u64 {
934 self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
935 }
936
937 pub fn resolve_run_id(
943 &self,
944 selector: &RunIdSelector,
945 ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
946 match selector {
947 RunIdSelector::Latest => self.most_recent_run(),
948 RunIdSelector::Prefix(prefix) => {
949 let run_id = self.resolve_run_id_prefix(prefix)?;
950 Ok(ResolveRunIdResult { run_id })
951 }
952 }
953 }
954
955 fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
961 self.run_id_index.resolve_prefix(prefix).map_err(|err| {
962 match err {
963 PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
964 prefix: prefix.to_string(),
965 },
966 PrefixResolutionError::Ambiguous { count, candidates } => {
967 let mut candidates: Vec<_> = candidates
969 .into_iter()
970 .filter_map(|run_id| self.get_run(run_id).cloned())
971 .collect();
972 candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
973 RunIdResolutionError::Ambiguous {
974 prefix: prefix.to_string(),
975 count,
976 candidates,
977 run_id_index: self.run_id_index.clone(),
978 }
979 }
980 PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
981 prefix: prefix.to_string(),
982 },
983 }
984 })
985 }
986
987 pub fn run_id_index(&self) -> &RunIdIndex {
989 &self.run_id_index
990 }
991
992 pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
994 self.runs.iter().find(|r| r.run_id == run_id)
995 }
996
997 pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1001 self.runs
1002 .iter()
1003 .max_by_key(|r| r.started_at)
1004 .map(|r| ResolveRunIdResult { run_id: r.run_id })
1005 .ok_or(RunIdResolutionError::NoRuns)
1006 }
1007
1008 pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1014 PrunePlan::compute(&self.runs, policy)
1015 }
1016}
1017
1018#[derive(Debug)]
1024pub struct SnapshotWithReplayability<'a> {
1025 snapshot: &'a RunStoreSnapshot,
1026 replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1027 latest_run_id: Option<ReportUuid>,
1028}
1029
1030impl<'a> SnapshotWithReplayability<'a> {
1031 pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1036 let runs_dir = snapshot.runs_dir();
1037 let replayability: HashMap<_, _> = snapshot
1038 .runs()
1039 .iter()
1040 .map(|run| (run.run_id, run.check_replayability(runs_dir)))
1041 .collect();
1042
1043 let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1045
1046 Self {
1047 snapshot,
1048 replayability,
1049 latest_run_id,
1050 }
1051 }
1052
1053 pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1055 self.snapshot
1056 }
1057
1058 pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1060 &self.replayability
1061 }
1062
1063 pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1070 self.replayability
1071 .get(&run_id)
1072 .expect("run ID should be in replayability map")
1073 }
1074
1075 pub fn latest_run_id(&self) -> Option<ReportUuid> {
1077 self.latest_run_id
1078 }
1079}
1080
1081#[cfg(test)]
1082impl SnapshotWithReplayability<'_> {
1083 pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1087 let replayability: HashMap<_, _> = snapshot
1088 .runs()
1089 .iter()
1090 .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1091 .collect();
1092
1093 let latest_run_id = snapshot
1095 .runs()
1096 .iter()
1097 .max_by_key(|r| r.started_at)
1098 .map(|r| r.run_id);
1099
1100 SnapshotWithReplayability {
1101 snapshot,
1102 replayability,
1103 latest_run_id,
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109impl RunStoreSnapshot {
1110 pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1112 use super::run_id_index::RunIdIndex;
1113
1114 let run_id_index = RunIdIndex::new(&runs);
1115 Self {
1116 runs_dir: Utf8PathBuf::from("/test/runs"),
1117 runs,
1118 write_permission: RunsJsonWritePermission::Allowed,
1119 run_id_index,
1120 }
1121 }
1122}
1123
1124#[derive(Clone, Copy)]
1126enum LockKind {
1127 Shared,
1128 Exclusive,
1129}
1130
1131fn acquire_lock_with_retry(
1136 file: &File,
1137 lock_file_path: &Utf8Path,
1138 kind: LockKind,
1139) -> Result<(), RunStoreError> {
1140 const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1141 const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1142
1143 let start = Instant::now();
1144 loop {
1145 let result = match kind {
1146 LockKind::Shared => file.try_lock_shared(),
1147 LockKind::Exclusive => file.try_lock(),
1148 };
1149
1150 match result {
1151 Ok(()) => return Ok(()),
1152 Err(TryLockError::WouldBlock) => {
1153 if start.elapsed() >= LOCK_TIMEOUT {
1155 return Err(RunStoreError::FileLockTimeout {
1156 path: lock_file_path.to_owned(),
1157 timeout_secs: LOCK_TIMEOUT.as_secs(),
1158 });
1159 }
1160 thread::sleep(LOCK_RETRY_INTERVAL);
1161 }
1162 Err(TryLockError::Error(error)) => {
1163 return Err(RunStoreError::FileLock {
1165 path: lock_file_path.to_owned(),
1166 error,
1167 });
1168 }
1169 }
1170 }
1171}