Skip to main content

nextest_runner/record/
store.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Run store management for nextest recordings.
5//!
6//! The run store is a directory that contains all recorded test runs. It provides:
7//!
8//! - A lock file for exclusive access during modifications.
9//! - A zstd-compressed JSON file (`runs.json.zst`) listing all recorded runs.
10//! - Individual directories for each run containing the archive and log.
11
12use super::{
13    display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14    format::{
15        RECORD_FORMAT_VERSION, RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission,
16        STORE_ZIP_FILE_NAME,
17    },
18    recorder::{RunRecorder, StoreSizes},
19    retention::{
20        PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21    },
22    run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25    errors::{RunIdResolutionError, RunStoreError},
26    helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27    redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35    collections::{BTreeMap, HashMap, HashSet},
36    fmt,
37    fs::{File, TryLockError},
38    io,
39    num::NonZero,
40    thread,
41    time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47/// A reference to the runs directory in a run store.
48///
49/// This provides methods to compute paths within the runs directory.
50#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54    /// Returns the path to a specific run's directory.
55    pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
56        self.0.join(run_id.to_string())
57    }
58
59    /// Returns the underlying path to the runs directory.
60    pub fn as_path(self) -> &'a Utf8Path {
61        self.0
62    }
63}
64
65/// Manages the storage of recorded test runs.
66///
67/// The run store is a directory containing a list of recorded runs and their data.
68/// Use [`RunStore::lock_exclusive`] to acquire exclusive access before creating
69/// new runs.
70#[derive(Debug)]
71pub struct RunStore {
72    runs_dir: Utf8PathBuf,
73}
74
75impl RunStore {
76    /// Creates a new `RunStore` at the given directory.
77    ///
78    /// Creates the directory if it doesn't exist.
79    pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
80        let runs_dir = store_dir.join("runs");
81        std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
82            run_dir: runs_dir.clone(),
83            error,
84        })?;
85
86        Ok(Self { runs_dir })
87    }
88
89    /// Returns the runs directory.
90    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
91        StoreRunsDir(&self.runs_dir)
92    }
93
94    /// Acquires a shared lock on the run store for reading.
95    ///
96    /// Multiple readers can hold the shared lock simultaneously, but the shared
97    /// lock is exclusive with the exclusive lock (used for writing).
98    ///
99    /// Uses non-blocking lock attempts with retries to handle both brief
100    /// contention and filesystems where locking may not work (e.g., NFS).
101    pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
102        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
103        let file = std::fs::OpenOptions::new()
104            .create(true)
105            .truncate(false)
106            .write(true)
107            .open(&lock_file_path)
108            .map_err(|error| RunStoreError::FileLock {
109                path: lock_file_path.clone(),
110                error,
111            })?;
112
113        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
114        let result = read_runs_json(&self.runs_dir)?;
115        let run_id_index = RunIdIndex::new(&result.runs);
116
117        Ok(SharedLockedRunStore {
118            runs_dir: StoreRunsDir(&self.runs_dir),
119            locked_file: DebugIgnore(file),
120            runs: result.runs,
121            write_permission: result.write_permission,
122            run_id_index,
123        })
124    }
125
126    /// Acquires an exclusive lock on the run store.
127    ///
128    /// This lock should only be held for a short duration (just long enough to
129    /// add a run to the list and create its directory).
130    ///
131    /// Uses non-blocking lock attempts with retries to handle both brief
132    /// contention and filesystems where locking may not work (e.g., NFS).
133    pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
134        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
135        let file = std::fs::OpenOptions::new()
136            .create(true)
137            .truncate(false)
138            .write(true)
139            .open(&lock_file_path)
140            .map_err(|error| RunStoreError::FileLock {
141                path: lock_file_path.clone(),
142                error,
143            })?;
144
145        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
146        let result = read_runs_json(&self.runs_dir)?;
147
148        Ok(ExclusiveLockedRunStore {
149            runs_dir: StoreRunsDir(&self.runs_dir),
150            locked_file: DebugIgnore(file),
151            runs: result.runs,
152            last_pruned_at: result.last_pruned_at,
153            write_permission: result.write_permission,
154        })
155    }
156}
157
158/// A run store that has been locked for exclusive access.
159///
160/// The lifetime parameter ensures this isn't held for longer than the
161/// corresponding [`RunStore`].
162#[derive(Debug)]
163pub struct ExclusiveLockedRunStore<'store> {
164    runs_dir: StoreRunsDir<'store>,
165    // Held for RAII lock semantics; the lock is released when this struct is dropped.
166    #[expect(dead_code)]
167    locked_file: DebugIgnore<File>,
168    runs: Vec<RecordedRunInfo>,
169    last_pruned_at: Option<DateTime<Utc>>,
170    write_permission: RunsJsonWritePermission,
171}
172
173impl<'store> ExclusiveLockedRunStore<'store> {
174    /// Returns the runs directory.
175    pub fn runs_dir(&self) -> StoreRunsDir<'store> {
176        self.runs_dir
177    }
178
179    /// Returns whether this nextest can write to the runs.json.zst file.
180    ///
181    /// If the file has a newer format version than we support, writing is denied.
182    pub fn write_permission(&self) -> RunsJsonWritePermission {
183        self.write_permission
184    }
185
186    /// Marks a run as completed and persists the change to disk.
187    ///
188    /// Updates sizes, `status`, and `duration_secs` to the given values.
189    /// Returns `true` if the run was found and updated, `false` if no run
190    /// with the given ID exists (in which case nothing is persisted).
191    ///
192    /// Returns an error if writing is denied due to a format version mismatch.
193    ///
194    /// The status should not be `Incomplete` since we're completing the run.
195    pub fn complete_run(
196        &mut self,
197        run_id: ReportUuid,
198        sizes: StoreSizes,
199        status: RecordedRunStatus,
200        duration_secs: Option<f64>,
201    ) -> Result<bool, RunStoreError> {
202        if let RunsJsonWritePermission::Denied {
203            file_version,
204            max_supported_version,
205        } = self.write_permission
206        {
207            return Err(RunStoreError::FormatVersionTooNew {
208                file_version,
209                max_supported_version,
210            });
211        }
212
213        let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
214        if found {
215            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
216        }
217        Ok(found)
218    }
219
220    /// Updates a run's metadata in memory.
221    fn mark_run_completed_inner(
222        &mut self,
223        run_id: ReportUuid,
224        sizes: StoreSizes,
225        status: RecordedRunStatus,
226        duration_secs: Option<f64>,
227    ) -> bool {
228        for run in &mut self.runs {
229            if run.run_id == run_id {
230                run.sizes = RecordedSizes {
231                    log: ComponentSizes {
232                        compressed: sizes.log.compressed,
233                        uncompressed: sizes.log.uncompressed,
234                        entries: sizes.log.entries,
235                    },
236                    store: ComponentSizes {
237                        compressed: sizes.store.compressed,
238                        uncompressed: sizes.store.uncompressed,
239                        entries: sizes.store.entries,
240                    },
241                };
242                run.status = status;
243                run.duration_secs = duration_secs;
244                run.last_written_at = Local::now().fixed_offset();
245                return true;
246            }
247        }
248        false
249    }
250
251    /// Prunes runs according to the given retention policy.
252    ///
253    /// This method:
254    /// 1. Determines which runs to delete based on the policy
255    /// 2. Deletes those run directories from disk
256    /// 3. Deletes any orphaned directories not tracked in runs.json.zst
257    /// 4. Updates the run list in memory and on disk
258    ///
259    /// The `kind` parameter indicates whether this is explicit pruning (from a
260    /// user command) or implicit pruning (automatic during recording). This
261    /// affects how errors are displayed.
262    ///
263    /// Returns the result of the pruning operation, including any errors that
264    /// occurred while deleting individual runs.
265    ///
266    /// Returns an error if writing is denied due to a format version mismatch.
267    pub fn prune(
268        &mut self,
269        policy: &RecordRetentionPolicy,
270        kind: PruneKind,
271    ) -> Result<PruneResult, RunStoreError> {
272        if let RunsJsonWritePermission::Denied {
273            file_version,
274            max_supported_version,
275        } = self.write_permission
276        {
277            return Err(RunStoreError::FormatVersionTooNew {
278                file_version,
279                max_supported_version,
280            });
281        }
282
283        let now = Utc::now();
284        let to_delete: HashSet<_> = policy
285            .compute_runs_to_delete(&self.runs, now)
286            .into_iter()
287            .collect();
288
289        let runs_dir = self.runs_dir();
290        let mut result = if to_delete.is_empty() {
291            PruneResult::default()
292        } else {
293            delete_runs(runs_dir, &mut self.runs, &to_delete)
294        };
295        result.kind = kind;
296
297        let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
298        delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
299
300        if result.deleted_count > 0 || result.orphans_deleted > 0 {
301            // Update last_pruned_at since we performed pruning.
302            self.last_pruned_at = Some(now);
303            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
304        }
305
306        Ok(result)
307    }
308
309    /// Prunes runs if needed, based on time since last prune and limit thresholds.
310    ///
311    /// This method implements implicit pruning, which occurs:
312    /// - If more than 1 day has passed since the last prune, OR
313    /// - If any retention limit is exceeded by 1.5x.
314    ///
315    /// Use [`Self::prune`] for explicit pruning that always runs regardless of these conditions.
316    ///
317    /// Returns `Ok(None)` if pruning was skipped, `Ok(Some(result))` if pruning occurred.
318    pub fn prune_if_needed(
319        &mut self,
320        policy: &RecordRetentionPolicy,
321    ) -> Result<Option<PruneResult>, RunStoreError> {
322        const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
323            Some(d) => d,
324            None => panic!("1 day should always be a valid TimeDelta"),
325        };
326        const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
327
328        let now = Utc::now();
329
330        // Check if pruning is needed.
331        let time_since_last_prune = self
332            .last_pruned_at
333            .map(|last| now.signed_duration_since(last))
334            .unwrap_or(TimeDelta::MAX);
335
336        let should_prune = time_since_last_prune >= PRUNE_INTERVAL
337            || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
338
339        if should_prune {
340            Ok(Some(self.prune(policy, PruneKind::Implicit)?))
341        } else {
342            Ok(None)
343        }
344    }
345
346    /// Creates a run recorder for a new run.
347    ///
348    /// Adds the run to the list and creates its directory. Consumes self,
349    /// dropping the exclusive lock.
350    ///
351    /// `max_output_size` specifies the maximum size of a single output (stdout/stderr)
352    /// before truncation.
353    ///
354    /// Returns the recorder and the shortest unique prefix for the run ID (for
355    /// display purposes), or an error if writing is denied due to a format
356    /// version mismatch.
357    #[expect(clippy::too_many_arguments)]
358    pub(crate) fn create_run_recorder(
359        mut self,
360        run_id: ReportUuid,
361        nextest_version: Version,
362        started_at: DateTime<FixedOffset>,
363        cli_args: Vec<String>,
364        build_scope_args: Vec<String>,
365        env_vars: BTreeMap<String, String>,
366        max_output_size: bytesize::ByteSize,
367        parent_run_id: Option<ReportUuid>,
368    ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
369        if let RunsJsonWritePermission::Denied {
370            file_version,
371            max_supported_version,
372        } = self.write_permission
373        {
374            return Err(RunStoreError::FormatVersionTooNew {
375                file_version,
376                max_supported_version,
377            });
378        }
379
380        // Add to the list of runs before creating the directory. This ensures
381        // that if creation fails, an empty run directory isn't left behind. (It
382        // does mean that there may be spurious entries in the list of runs,
383        // which will be dealt with during pruning.)
384
385        let now = Local::now().fixed_offset();
386        let run = RecordedRunInfo {
387            run_id,
388            store_format_version: RECORD_FORMAT_VERSION,
389            nextest_version,
390            started_at,
391            last_written_at: now,
392            duration_secs: None,
393            cli_args,
394            build_scope_args,
395            env_vars,
396            parent_run_id,
397            sizes: RecordedSizes::default(),
398            status: RecordedRunStatus::Incomplete,
399        };
400        self.runs.push(run);
401
402        // If the parent run ID is set, update its last written at time.
403        if let Some(parent_run_id) = parent_run_id
404            && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
405        {
406            parent_run.last_written_at = now;
407        }
408
409        write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
410
411        // Compute the unique prefix now that the run is in the list.
412        let index = RunIdIndex::new(&self.runs);
413        let unique_prefix = index
414            .shortest_unique_prefix(run_id)
415            .expect("run was just added to the list");
416
417        // Create the run directory while still holding the lock. This prevents
418        // a race where another process could prune the newly-added run entry
419        // before the directory exists, leaving an orphaned directory. The lock
420        // is released when `self` is dropped.
421        let run_dir = self.runs_dir().run_dir(run_id);
422
423        let recorder = RunRecorder::new(run_dir, max_output_size)?;
424        Ok((recorder, unique_prefix))
425    }
426}
427
428/// Information about a recorded run.
429#[derive(Clone, Debug)]
430pub struct RecordedRunInfo {
431    /// The unique identifier for this run.
432    pub run_id: ReportUuid,
433    /// The format version of this run's store.zip archive.
434    ///
435    /// This allows checking replayability without opening the archive.
436    pub store_format_version: u32,
437    /// The version of nextest that created this run.
438    pub nextest_version: Version,
439    /// When the run started.
440    pub started_at: DateTime<FixedOffset>,
441    /// When this run was last written to.
442    ///
443    /// Used for LRU eviction. Updated when the run is created, when the run
444    /// completes, and when a rerun references this run.
445    pub last_written_at: DateTime<FixedOffset>,
446    /// Duration of the run in seconds.
447    ///
448    /// This is `None` for incomplete runs.
449    pub duration_secs: Option<f64>,
450    /// The command-line arguments used to invoke nextest.
451    pub cli_args: Vec<String>,
452    /// Build scope arguments (package and target selection).
453    ///
454    /// These determine which packages and targets are built. In a rerun chain,
455    /// these are inherited from the original run unless explicitly overridden.
456    pub build_scope_args: Vec<String>,
457    /// Environment variables that affect nextest behavior (NEXTEST_* and CARGO_*).
458    pub env_vars: BTreeMap<String, String>,
459    /// If this is a rerun, the ID of the parent run.
460    ///
461    /// This forms a chain for iterative fix-and-rerun workflows.
462    pub parent_run_id: Option<ReportUuid>,
463    /// Sizes broken down by component (log and store).
464    pub sizes: RecordedSizes,
465    /// The status and statistics for this run.
466    pub status: RecordedRunStatus,
467}
468
469/// Sizes broken down by component (log and store).
470#[derive(Clone, Copy, Debug, Default)]
471pub struct RecordedSizes {
472    /// Sizes for the run log (run.log.zst).
473    pub log: ComponentSizes,
474    /// Sizes for the store archive (store.zip).
475    pub store: ComponentSizes,
476}
477
478/// Compressed and uncompressed sizes for a single component.
479#[derive(Clone, Copy, Debug, Default)]
480pub struct ComponentSizes {
481    /// Compressed size in bytes.
482    pub compressed: u64,
483    /// Uncompressed size in bytes.
484    pub uncompressed: u64,
485    /// Number of entries (records for log, files for store).
486    pub entries: u64,
487}
488
489impl RecordedSizes {
490    /// Returns the total compressed size (log + store).
491    pub fn total_compressed(&self) -> u64 {
492        self.log.compressed + self.store.compressed
493    }
494
495    /// Returns the total uncompressed size (log + store).
496    pub fn total_uncompressed(&self) -> u64 {
497        self.log.uncompressed + self.store.uncompressed
498    }
499
500    /// Returns the total number of entries (log records + store files).
501    pub fn total_entries(&self) -> u64 {
502        self.log.entries + self.store.entries
503    }
504}
505
506/// Status and statistics for a recorded run.
507#[derive(Clone, Debug)]
508pub enum RecordedRunStatus {
509    /// The run was interrupted before completion.
510    Incomplete,
511    /// A normal test run completed (all tests finished).
512    Completed(CompletedRunStats),
513    /// A normal test run was cancelled before all tests finished.
514    Cancelled(CompletedRunStats),
515    /// A stress test run completed (all iterations finished).
516    StressCompleted(StressCompletedRunStats),
517    /// A stress test run was cancelled before all iterations finished.
518    StressCancelled(StressCompletedRunStats),
519    /// An unknown status from a newer version of nextest.
520    ///
521    /// This variant is used for forward compatibility when reading runs.json.zst
522    /// files created by newer nextest versions that may have new status types.
523    Unknown,
524}
525
526impl RecordedRunStatus {
527    /// Returns a short status string for display.
528    pub fn short_status_str(&self) -> &'static str {
529        match self {
530            Self::Incomplete => "incomplete",
531            Self::Unknown => "unknown",
532            Self::Completed(_) => "completed",
533            Self::Cancelled(_) => "cancelled",
534            Self::StressCompleted(_) => "stress completed",
535            Self::StressCancelled(_) => "stress cancelled",
536        }
537    }
538
539    /// Returns the exit code for completed runs, or `None` for incomplete/unknown runs.
540    pub fn exit_code(&self) -> Option<i32> {
541        match self {
542            Self::Incomplete | Self::Unknown => None,
543            Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
544            Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
545        }
546    }
547}
548
549/// Statistics for a normal test run that finished (completed or cancelled).
550#[derive(Clone, Copy, Debug)]
551pub struct CompletedRunStats {
552    /// The number of tests that were expected to run.
553    pub initial_run_count: usize,
554    /// The number of tests that passed.
555    pub passed: usize,
556    /// The number of tests that failed (including exec failures and timeouts).
557    pub failed: usize,
558    /// The exit code from the run.
559    pub exit_code: i32,
560}
561
562/// Statistics for a stress test run that finished (completed or cancelled).
563#[derive(Clone, Copy, Debug)]
564pub struct StressCompletedRunStats {
565    /// The number of stress iterations that were expected to run, if known.
566    ///
567    /// This is `None` when the stress test was run without a fixed iteration count
568    /// (e.g., `--stress-duration`).
569    pub initial_iteration_count: Option<NonZero<u32>>,
570    /// The number of stress iterations that succeeded.
571    pub success_count: u32,
572    /// The number of stress iterations that failed.
573    pub failed_count: u32,
574    /// The exit code from the run.
575    pub exit_code: i32,
576}
577
578// ---
579// Replayability checking
580// ---
581
582/// The result of checking whether a run can be replayed.
583#[derive(Clone, Debug)]
584pub enum ReplayabilityStatus {
585    /// The run is definitely replayable.
586    ///
587    /// No blocking reasons and no uncertain conditions.
588    Replayable,
589    /// The run is definitely not replayable.
590    ///
591    /// Contains at least one blocking reason.
592    NotReplayable(Vec<NonReplayableReason>),
593    /// The run might be replayable but is incomplete.
594    ///
595    /// The archive might be usable, but we'd need to open `store.zip` to
596    /// verify all expected files are present.
597    Incomplete,
598}
599
600/// A definite reason why a run cannot be replayed.
601#[derive(Clone, Debug, PartialEq, Eq)]
602pub enum NonReplayableReason {
603    /// The run's store format version is newer than this nextest supports.
604    ///
605    /// This nextest version cannot read the archive format.
606    StoreFormatTooNew {
607        /// The format version in the run's archive.
608        run_version: u32,
609        /// The maximum format version this nextest supports.
610        max_supported: u32,
611    },
612    /// The `store.zip` file is missing from the run directory.
613    MissingStoreZip,
614    /// The `run.log.zst` file is missing from the run directory.
615    MissingRunLog,
616    /// The run status is `Unknown` (from a newer nextest version).
617    ///
618    /// We cannot safely replay since we don't understand the run's state.
619    StatusUnknown,
620}
621
622impl fmt::Display for NonReplayableReason {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        match self {
625            Self::StoreFormatTooNew {
626                run_version,
627                max_supported,
628            } => {
629                write!(
630                    f,
631                    "store format version {} is newer than supported (version {})",
632                    run_version, max_supported
633                )
634            }
635            Self::MissingStoreZip => {
636                write!(f, "store.zip is missing")
637            }
638            Self::MissingRunLog => {
639                write!(f, "run.log.zst is missing")
640            }
641            Self::StatusUnknown => {
642                write!(f, "run status is unknown (from a newer nextest version)")
643            }
644        }
645    }
646}
647
648/// Result of looking up a run by selector.
649#[derive(Clone, Copy, Debug)]
650pub struct ResolveRunIdResult {
651    /// The run ID.
652    pub run_id: ReportUuid,
653}
654
655impl RecordedRunStatus {
656    /// Returns the width (in decimal digits) needed to display the "passed" count.
657    ///
658    /// For non-completed runs (Incomplete, Unknown), returns 0 since they don't
659    /// display a passed count.
660    pub fn passed_count_width(&self) -> usize {
661        match self {
662            Self::Incomplete | Self::Unknown => 0,
663            Self::Completed(stats) | Self::Cancelled(stats) => {
664                usize_decimal_char_width(stats.passed)
665            }
666            Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
667                // Stress tests use u32, convert to usize for width calculation.
668                u32_decimal_char_width(stats.success_count)
669            }
670        }
671    }
672}
673
674impl RecordedRunInfo {
675    /// Checks whether this run can be replayed.
676    ///
677    /// This performs a comprehensive check of all conditions that might prevent
678    /// replay, including:
679    /// - Store format version compatibility
680    /// - Presence of required files (store.zip, run.log.zst)
681    /// - Run status (unknown, incomplete)
682    ///
683    /// The `runs_dir` parameter is used to check for file existence on disk.
684    pub fn check_replayability(&self, runs_dir: StoreRunsDir<'_>) -> ReplayabilityStatus {
685        let mut blocking = Vec::new();
686        let mut is_incomplete = false;
687
688        // Check store format version.
689        if self.store_format_version > RECORD_FORMAT_VERSION {
690            blocking.push(NonReplayableReason::StoreFormatTooNew {
691                run_version: self.store_format_version,
692                max_supported: RECORD_FORMAT_VERSION,
693            });
694        }
695        // Note: When we bump format versions, add a similar StoreFormatTooOld
696        // check here.
697
698        // Check for required files on disk.
699        let run_dir = runs_dir.run_dir(self.run_id);
700        let store_zip_path = run_dir.join(STORE_ZIP_FILE_NAME);
701        let run_log_path = run_dir.join(RUN_LOG_FILE_NAME);
702
703        if !store_zip_path.exists() {
704            blocking.push(NonReplayableReason::MissingStoreZip);
705        }
706        if !run_log_path.exists() {
707            blocking.push(NonReplayableReason::MissingRunLog);
708        }
709
710        // Check run status.
711        match &self.status {
712            RecordedRunStatus::Unknown => {
713                blocking.push(NonReplayableReason::StatusUnknown);
714            }
715            RecordedRunStatus::Incomplete => {
716                is_incomplete = true;
717            }
718            RecordedRunStatus::Completed(_)
719            | RecordedRunStatus::Cancelled(_)
720            | RecordedRunStatus::StressCompleted(_)
721            | RecordedRunStatus::StressCancelled(_) => {
722                // These statuses are fine for replay.
723            }
724        }
725
726        // Return the appropriate variant based on what we found.
727        if !blocking.is_empty() {
728            ReplayabilityStatus::NotReplayable(blocking)
729        } else if is_incomplete {
730            ReplayabilityStatus::Incomplete
731        } else {
732            ReplayabilityStatus::Replayable
733        }
734    }
735
736    /// Returns a display wrapper for this run.
737    ///
738    /// The `run_id_index` is used for computing shortest unique prefixes,
739    /// which are highlighted differently in the output (similar to jj).
740    ///
741    /// The `alignment` parameter controls column alignment when displaying a
742    /// list of runs. Use [`RunListAlignment::from_runs`] to precompute
743    /// alignment for a set of runs.
744    ///
745    /// The `redactor` parameter, if provided, redacts timestamps, durations,
746    /// and sizes for snapshot testing while preserving column alignment.
747    pub fn display<'a>(
748        &'a self,
749        run_id_index: &'a RunIdIndex,
750        replayability: &'a ReplayabilityStatus,
751        alignment: RunListAlignment,
752        styles: &'a Styles,
753        redactor: &'a Redactor,
754    ) -> DisplayRecordedRunInfo<'a> {
755        DisplayRecordedRunInfo::new(
756            self,
757            run_id_index,
758            replayability,
759            alignment,
760            styles,
761            redactor,
762        )
763    }
764
765    /// Returns a detailed display wrapper for this run.
766    ///
767    /// Unlike [`Self::display`] which shows a compact table row, this provides
768    /// a multi-line detailed view suitable for the `store info` command.
769    ///
770    /// The `replayability` parameter should be computed by the caller using
771    /// [`Self::check_replayability`].
772    ///
773    /// The `now` parameter is the current time, used to compute relative
774    /// durations (e.g. "30s ago").
775    ///
776    /// The `redactor` parameter redacts paths, timestamps, durations, and sizes
777    /// for snapshot testing. Use `Redactor::noop()` if no redaction is needed.
778    pub fn display_detailed<'a>(
779        &'a self,
780        run_id_index: &'a RunIdIndex,
781        replayability: &'a ReplayabilityStatus,
782        now: DateTime<Utc>,
783        styles: &'a Styles,
784        theme_characters: &'a ThemeCharacters,
785        redactor: &'a Redactor,
786    ) -> DisplayRecordedRunInfoDetailed<'a> {
787        DisplayRecordedRunInfoDetailed::new(
788            self,
789            run_id_index,
790            replayability,
791            now,
792            styles,
793            theme_characters,
794            redactor,
795        )
796    }
797}
798
799/// Result of reading runs.json.zst.
800struct ReadRunsJsonResult {
801    runs: Vec<RecordedRunInfo>,
802    last_pruned_at: Option<DateTime<Utc>>,
803    write_permission: RunsJsonWritePermission,
804}
805
806/// Reads and deserializes `runs.json.zst`, converting to the internal
807/// representation.
808fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
809    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
810    let file = match File::open(&runs_json_path) {
811        Ok(file) => file,
812        Err(error) => {
813            if error.kind() == io::ErrorKind::NotFound {
814                // The file doesn't exist yet, so we can write a new one.
815                return Ok(ReadRunsJsonResult {
816                    runs: Vec::new(),
817                    last_pruned_at: None,
818                    write_permission: RunsJsonWritePermission::Allowed,
819                });
820            } else {
821                return Err(RunStoreError::RunListRead {
822                    path: runs_json_path,
823                    error,
824                });
825            }
826        }
827    };
828
829    let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
830        path: runs_json_path.clone(),
831        error,
832    })?;
833
834    let list: RecordedRunList =
835        serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
836            path: runs_json_path,
837            error,
838        })?;
839    let write_permission = list.write_permission();
840    let data = list.into_data();
841    Ok(ReadRunsJsonResult {
842        runs: data.runs,
843        last_pruned_at: data.last_pruned_at,
844        write_permission,
845    })
846}
847
848/// Serializes and writes runs.json.zst from internal representation.
849fn write_runs_json(
850    runs_dir: &Utf8Path,
851    runs: &[RecordedRunInfo],
852    last_pruned_at: Option<DateTime<Utc>>,
853) -> Result<(), RunStoreError> {
854    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
855    let list = RecordedRunList::from_data(runs, last_pruned_at);
856
857    atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
858        .write(|file| {
859            // Use compression level 3, consistent with other zstd usage in the crate.
860            let mut encoder = zstd::stream::Encoder::new(file, 3)?;
861            serde_json::to_writer_pretty(&mut encoder, &list)?;
862            encoder.finish()?;
863            Ok(())
864        })
865        .map_err(|error| RunStoreError::RunListWrite {
866            path: runs_json_path,
867            error,
868        })?;
869
870    Ok(())
871}
872
873/// A run store that has been locked for shared (read-only) access.
874///
875/// Multiple readers can hold this lock simultaneously, but it is exclusive
876/// with the exclusive lock used for writing.
877#[derive(Debug)]
878pub struct SharedLockedRunStore<'store> {
879    runs_dir: StoreRunsDir<'store>,
880    #[expect(dead_code, reason = "held for lock duration")]
881    locked_file: DebugIgnore<File>,
882    runs: Vec<RecordedRunInfo>,
883    write_permission: RunsJsonWritePermission,
884    run_id_index: RunIdIndex,
885}
886
887impl<'store> SharedLockedRunStore<'store> {
888    /// Returns a snapshot of the runs data, consuming self and releasing the
889    /// lock.
890    pub fn into_snapshot(self) -> RunStoreSnapshot {
891        RunStoreSnapshot {
892            runs_dir: self.runs_dir.as_path().to_owned(),
893            runs: self.runs,
894            write_permission: self.write_permission,
895            run_id_index: self.run_id_index,
896        }
897    }
898}
899
900/// A snapshot of run store data.
901#[derive(Debug)]
902pub struct RunStoreSnapshot {
903    runs_dir: Utf8PathBuf,
904    runs: Vec<RecordedRunInfo>,
905    write_permission: RunsJsonWritePermission,
906    run_id_index: RunIdIndex,
907}
908
909impl RunStoreSnapshot {
910    /// Returns the runs directory.
911    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
912        StoreRunsDir(&self.runs_dir)
913    }
914
915    /// Returns whether this nextest can write to the runs.json.zst file.
916    ///
917    /// If the file has a newer format version than we support, writing is denied.
918    pub fn write_permission(&self) -> RunsJsonWritePermission {
919        self.write_permission
920    }
921
922    /// Returns a list of recorded runs.
923    pub fn runs(&self) -> &[RecordedRunInfo] {
924        &self.runs
925    }
926
927    /// Returns the number of recorded runs.
928    pub fn run_count(&self) -> usize {
929        self.runs.len()
930    }
931
932    /// Returns the total compressed size of all recorded runs in bytes.
933    pub fn total_size(&self) -> u64 {
934        self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
935    }
936
937    /// Resolves a run ID selector to a run result.
938    ///
939    /// For [`RunIdSelector::Latest`], returns the most recent run by start
940    /// time.
941    /// For [`RunIdSelector::Prefix`], resolves the prefix to a specific run.
942    pub fn resolve_run_id(
943        &self,
944        selector: &RunIdSelector,
945    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
946        match selector {
947            RunIdSelector::Latest => self.most_recent_run(),
948            RunIdSelector::Prefix(prefix) => {
949                let run_id = self.resolve_run_id_prefix(prefix)?;
950                Ok(ResolveRunIdResult { run_id })
951            }
952        }
953    }
954
955    /// Resolves a run ID prefix to a full UUID.
956    ///
957    /// The prefix must be a valid hexadecimal string. If the prefix matches
958    /// exactly one run, that run's UUID is returned. Otherwise, an error is
959    /// returned indicating whether no runs matched or multiple runs matched.
960    fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
961        self.run_id_index.resolve_prefix(prefix).map_err(|err| {
962            match err {
963                PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
964                    prefix: prefix.to_string(),
965                },
966                PrefixResolutionError::Ambiguous { count, candidates } => {
967                    // Convert UUIDs to full RecordedRunInfo and sort by start time (most recent first).
968                    let mut candidates: Vec<_> = candidates
969                        .into_iter()
970                        .filter_map(|run_id| self.get_run(run_id).cloned())
971                        .collect();
972                    candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
973                    RunIdResolutionError::Ambiguous {
974                        prefix: prefix.to_string(),
975                        count,
976                        candidates,
977                        run_id_index: self.run_id_index.clone(),
978                    }
979                }
980                PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
981                    prefix: prefix.to_string(),
982                },
983            }
984        })
985    }
986
987    /// Returns the run ID index for computing shortest unique prefixes.
988    pub fn run_id_index(&self) -> &RunIdIndex {
989        &self.run_id_index
990    }
991
992    /// Looks up a run by its exact UUID.
993    pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
994        self.runs.iter().find(|r| r.run_id == run_id)
995    }
996
997    /// Returns the most recent run by start time.
998    ///
999    /// Returns an error if there are no runs at all.
1000    pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1001        self.runs
1002            .iter()
1003            .max_by_key(|r| r.started_at)
1004            .map(|r| ResolveRunIdResult { run_id: r.run_id })
1005            .ok_or(RunIdResolutionError::NoRuns)
1006    }
1007
1008    /// Computes which runs would be deleted by a prune operation.
1009    ///
1010    /// This is used for dry-run mode to show what would be deleted without
1011    /// actually deleting anything. Returns a [`PrunePlan`] containing the runs
1012    /// that would be deleted, sorted by start time (oldest first).
1013    pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1014        PrunePlan::compute(&self.runs, policy)
1015    }
1016}
1017
1018/// A snapshot paired with precomputed replayability status for all runs.
1019///
1020/// This struct maintains the invariant that every run in the snapshot has a
1021/// corresponding entry in the replayability map. Use [`Self::new`] to compute
1022/// replayability for all runs, or `Self::new_for_test` for testing.
1023#[derive(Debug)]
1024pub struct SnapshotWithReplayability<'a> {
1025    snapshot: &'a RunStoreSnapshot,
1026    replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1027    latest_run_id: Option<ReportUuid>,
1028}
1029
1030impl<'a> SnapshotWithReplayability<'a> {
1031    /// Creates a new snapshot with replayability by checking all runs.
1032    ///
1033    /// This computes [`ReplayabilityStatus`] for each run by checking file
1034    /// existence and format versions.
1035    pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1036        let runs_dir = snapshot.runs_dir();
1037        let replayability: HashMap<_, _> = snapshot
1038            .runs()
1039            .iter()
1040            .map(|run| (run.run_id, run.check_replayability(runs_dir)))
1041            .collect();
1042
1043        // Find the latest run by time.
1044        let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1045
1046        Self {
1047            snapshot,
1048            replayability,
1049            latest_run_id,
1050        }
1051    }
1052
1053    /// Returns a reference to the underlying snapshot.
1054    pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1055        self.snapshot
1056    }
1057
1058    /// Returns the replayability map.
1059    pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1060        &self.replayability
1061    }
1062
1063    /// Returns the replayability status for a specific run.
1064    ///
1065    /// # Panics
1066    ///
1067    /// Panics if the run ID is not in the snapshot. This maintains the
1068    /// invariant that all runs in the snapshot have replayability computed.
1069    pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1070        self.replayability
1071            .get(&run_id)
1072            .expect("run ID should be in replayability map")
1073    }
1074
1075    /// Returns the ID of the most recent run by start time, if any.
1076    pub fn latest_run_id(&self) -> Option<ReportUuid> {
1077        self.latest_run_id
1078    }
1079}
1080
1081#[cfg(test)]
1082impl SnapshotWithReplayability<'_> {
1083    /// Creates a snapshot with replayability for testing.
1084    ///
1085    /// All runs are marked as [`ReplayabilityStatus::Replayable`] by default.
1086    pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1087        let replayability: HashMap<_, _> = snapshot
1088            .runs()
1089            .iter()
1090            .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1091            .collect();
1092
1093        // For tests, latest is just the most recent by time.
1094        let latest_run_id = snapshot
1095            .runs()
1096            .iter()
1097            .max_by_key(|r| r.started_at)
1098            .map(|r| r.run_id);
1099
1100        SnapshotWithReplayability {
1101            snapshot,
1102            replayability,
1103            latest_run_id,
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109impl RunStoreSnapshot {
1110    /// Creates a new snapshot for testing.
1111    pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1112        use super::run_id_index::RunIdIndex;
1113
1114        let run_id_index = RunIdIndex::new(&runs);
1115        Self {
1116            runs_dir: Utf8PathBuf::from("/test/runs"),
1117            runs,
1118            write_permission: RunsJsonWritePermission::Allowed,
1119            run_id_index,
1120        }
1121    }
1122}
1123
1124/// The kind of lock to acquire.
1125#[derive(Clone, Copy)]
1126enum LockKind {
1127    Shared,
1128    Exclusive,
1129}
1130
1131/// Acquires a file lock with retries, timing out after 5 seconds.
1132///
1133/// This handles both brief contention (another nextest process finishing up)
1134/// and filesystems where locking may not work properly (e.g., NFS).
1135fn acquire_lock_with_retry(
1136    file: &File,
1137    lock_file_path: &Utf8Path,
1138    kind: LockKind,
1139) -> Result<(), RunStoreError> {
1140    const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1141    const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1142
1143    let start = Instant::now();
1144    loop {
1145        let result = match kind {
1146            LockKind::Shared => file.try_lock_shared(),
1147            LockKind::Exclusive => file.try_lock(),
1148        };
1149
1150        match result {
1151            Ok(()) => return Ok(()),
1152            Err(TryLockError::WouldBlock) => {
1153                // Lock is held by another process. Retry if we haven't timed out.
1154                if start.elapsed() >= LOCK_TIMEOUT {
1155                    return Err(RunStoreError::FileLockTimeout {
1156                        path: lock_file_path.to_owned(),
1157                        timeout_secs: LOCK_TIMEOUT.as_secs(),
1158                    });
1159                }
1160                thread::sleep(LOCK_RETRY_INTERVAL);
1161            }
1162            Err(TryLockError::Error(error)) => {
1163                // Some other error (e.g., locking not supported on this filesystem).
1164                return Err(RunStoreError::FileLock {
1165                    path: lock_file_path.to_owned(),
1166                    error,
1167                });
1168            }
1169        }
1170    }
1171}