Skip to main content

nextest_runner/record/
store.rs

1// Copyright (c) The nextest Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Run store management for nextest recordings.
5//!
6//! The run store is a directory that contains all recorded test runs. It provides:
7//!
8//! - A lock file for exclusive access during modifications.
9//! - A zstd-compressed JSON file (`runs.json.zst`) listing all recorded runs.
10//! - Individual directories for each run containing the archive and log.
11
12use super::{
13    display::{DisplayRecordedRunInfo, DisplayRecordedRunInfoDetailed, RunListAlignment, Styles},
14    format::{
15        RUN_LOG_FILE_NAME, RecordedRunList, RunsJsonWritePermission, STORE_FORMAT_VERSION,
16        STORE_ZIP_FILE_NAME, StoreFormatVersion, StoreVersionIncompatibility,
17    },
18    recorder::{RunRecorder, StoreSizes},
19    retention::{
20        PruneKind, PrunePlan, PruneResult, RecordRetentionPolicy, delete_orphaned_dirs, delete_runs,
21    },
22    run_id_index::{PrefixResolutionError, RunIdIndex, RunIdSelector, ShortestRunIdPrefix},
23};
24use crate::{
25    errors::{RunIdResolutionError, RunStoreError},
26    helpers::{ThemeCharacters, u32_decimal_char_width, usize_decimal_char_width},
27    redact::Redactor,
28};
29use camino::{Utf8Path, Utf8PathBuf};
30use chrono::{DateTime, FixedOffset, Local, TimeDelta, Utc};
31use debug_ignore::DebugIgnore;
32use quick_junit::ReportUuid;
33use semver::Version;
34use std::{
35    collections::{BTreeMap, HashMap, HashSet},
36    fmt,
37    fs::{File, TryLockError},
38    io,
39    num::NonZero,
40    thread,
41    time::{Duration, Instant},
42};
43
44static RUNS_LOCK_FILE_NAME: &str = "runs.lock";
45static RUNS_JSON_FILE_NAME: &str = "runs.json.zst";
46
47/// A reference to the runs directory in a run store.
48///
49/// This provides methods to compute paths within the runs directory.
50#[derive(Clone, Copy, Debug)]
51pub struct StoreRunsDir<'a>(&'a Utf8Path);
52
53impl<'a> StoreRunsDir<'a> {
54    /// Creates a new `StoreRunsDir` from a path.
55    pub fn new(path: &'a Utf8Path) -> Self {
56        Self(path)
57    }
58
59    /// Returns the path to a specific run's directory.
60    pub fn run_dir(self, run_id: ReportUuid) -> Utf8PathBuf {
61        self.0.join(run_id.to_string())
62    }
63
64    /// Returns a [`RunFilesExist`] implementation for checking file existence
65    /// for a specific run in this store.
66    pub fn run_files(self, run_id: ReportUuid) -> StoreRunFiles {
67        StoreRunFiles {
68            run_dir: self.run_dir(run_id),
69        }
70    }
71
72    /// Returns the underlying path to the runs directory.
73    pub fn as_path(self) -> &'a Utf8Path {
74        self.0
75    }
76}
77
78/// Trait for checking whether required run files exist.
79///
80/// This abstracts over different storage backends.
81pub trait RunFilesExist {
82    /// Returns true if `store.zip` exists.
83    fn store_zip_exists(&self) -> bool;
84    /// Returns true if `run.log.zst` exists.
85    fn run_log_exists(&self) -> bool;
86}
87
88/// Checks file existence for a run stored on disk.
89///
90/// Created via [`StoreRunsDir::run_files`].
91pub struct StoreRunFiles {
92    run_dir: Utf8PathBuf,
93}
94
95impl RunFilesExist for StoreRunFiles {
96    fn store_zip_exists(&self) -> bool {
97        self.run_dir.join(STORE_ZIP_FILE_NAME).exists()
98    }
99
100    fn run_log_exists(&self) -> bool {
101        self.run_dir.join(RUN_LOG_FILE_NAME).exists()
102    }
103}
104
105/// Manages the storage of recorded test runs.
106///
107/// The run store is a directory containing a list of recorded runs and their data.
108/// Use [`RunStore::lock_exclusive`] to acquire exclusive access before creating
109/// new runs.
110#[derive(Debug)]
111pub struct RunStore {
112    runs_dir: Utf8PathBuf,
113}
114
115impl RunStore {
116    /// Creates a new `RunStore` at the given directory.
117    ///
118    /// Creates the directory if it doesn't exist.
119    pub fn new(store_dir: &Utf8Path) -> Result<Self, RunStoreError> {
120        let runs_dir = store_dir.join("runs");
121        std::fs::create_dir_all(&runs_dir).map_err(|error| RunStoreError::RunDirCreate {
122            run_dir: runs_dir.clone(),
123            error,
124        })?;
125
126        Ok(Self { runs_dir })
127    }
128
129    /// Returns the runs directory.
130    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
131        StoreRunsDir(&self.runs_dir)
132    }
133
134    /// Acquires a shared lock on the run store for reading.
135    ///
136    /// Multiple readers can hold the shared lock simultaneously, but the shared
137    /// lock is exclusive with the exclusive lock (used for writing).
138    ///
139    /// Uses non-blocking lock attempts with retries to handle both brief
140    /// contention and filesystems where locking may not work (e.g., NFS).
141    pub fn lock_shared(&self) -> Result<SharedLockedRunStore<'_>, RunStoreError> {
142        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
143        let file = std::fs::OpenOptions::new()
144            .create(true)
145            .truncate(false)
146            .write(true)
147            .open(&lock_file_path)
148            .map_err(|error| RunStoreError::FileLock {
149                path: lock_file_path.clone(),
150                error,
151            })?;
152
153        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Shared)?;
154        let result = read_runs_json(&self.runs_dir)?;
155        let run_id_index = RunIdIndex::new(&result.runs);
156
157        Ok(SharedLockedRunStore {
158            runs_dir: StoreRunsDir(&self.runs_dir),
159            locked_file: DebugIgnore(file),
160            runs: result.runs,
161            write_permission: result.write_permission,
162            run_id_index,
163        })
164    }
165
166    /// Acquires an exclusive lock on the run store.
167    ///
168    /// This lock should only be held for a short duration (just long enough to
169    /// add a run to the list and create its directory).
170    ///
171    /// Uses non-blocking lock attempts with retries to handle both brief
172    /// contention and filesystems where locking may not work (e.g., NFS).
173    pub fn lock_exclusive(&self) -> Result<ExclusiveLockedRunStore<'_>, RunStoreError> {
174        let lock_file_path = self.runs_dir.join(RUNS_LOCK_FILE_NAME);
175        let file = std::fs::OpenOptions::new()
176            .create(true)
177            .truncate(false)
178            .write(true)
179            .open(&lock_file_path)
180            .map_err(|error| RunStoreError::FileLock {
181                path: lock_file_path.clone(),
182                error,
183            })?;
184
185        acquire_lock_with_retry(&file, &lock_file_path, LockKind::Exclusive)?;
186        let result = read_runs_json(&self.runs_dir)?;
187
188        Ok(ExclusiveLockedRunStore {
189            runs_dir: StoreRunsDir(&self.runs_dir),
190            locked_file: DebugIgnore(file),
191            runs: result.runs,
192            last_pruned_at: result.last_pruned_at,
193            write_permission: result.write_permission,
194        })
195    }
196}
197
198/// A run store that has been locked for exclusive access.
199///
200/// The lifetime parameter ensures this isn't held for longer than the
201/// corresponding [`RunStore`].
202#[derive(Debug)]
203pub struct ExclusiveLockedRunStore<'store> {
204    runs_dir: StoreRunsDir<'store>,
205    // Held for RAII lock semantics; the lock is released when this struct is dropped.
206    #[expect(dead_code)]
207    locked_file: DebugIgnore<File>,
208    runs: Vec<RecordedRunInfo>,
209    last_pruned_at: Option<DateTime<Utc>>,
210    write_permission: RunsJsonWritePermission,
211}
212
213impl<'store> ExclusiveLockedRunStore<'store> {
214    /// Returns the runs directory.
215    pub fn runs_dir(&self) -> StoreRunsDir<'store> {
216        self.runs_dir
217    }
218
219    /// Returns whether this nextest can write to the runs.json.zst file.
220    ///
221    /// If the file has a newer format version than we support, writing is denied.
222    pub fn write_permission(&self) -> RunsJsonWritePermission {
223        self.write_permission
224    }
225
226    /// Marks a run as completed and persists the change to disk.
227    ///
228    /// Updates sizes, `status`, and `duration_secs` to the given values.
229    /// Returns `true` if the run was found and updated, `false` if no run
230    /// with the given ID exists (in which case nothing is persisted).
231    ///
232    /// Returns an error if writing is denied due to a format version mismatch.
233    ///
234    /// The status should not be `Incomplete` since we're completing the run.
235    pub fn complete_run(
236        &mut self,
237        run_id: ReportUuid,
238        sizes: StoreSizes,
239        status: RecordedRunStatus,
240        duration_secs: Option<f64>,
241    ) -> Result<bool, RunStoreError> {
242        if let RunsJsonWritePermission::Denied {
243            file_version,
244            max_supported_version,
245        } = self.write_permission
246        {
247            return Err(RunStoreError::FormatVersionTooNew {
248                file_version,
249                max_supported_version,
250            });
251        }
252
253        let found = self.mark_run_completed_inner(run_id, sizes, status, duration_secs);
254        if found {
255            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
256        }
257        Ok(found)
258    }
259
260    /// Updates a run's metadata in memory.
261    fn mark_run_completed_inner(
262        &mut self,
263        run_id: ReportUuid,
264        sizes: StoreSizes,
265        status: RecordedRunStatus,
266        duration_secs: Option<f64>,
267    ) -> bool {
268        for run in &mut self.runs {
269            if run.run_id == run_id {
270                run.sizes = RecordedSizes {
271                    log: ComponentSizes {
272                        compressed: sizes.log.compressed,
273                        uncompressed: sizes.log.uncompressed,
274                        entries: sizes.log.entries,
275                    },
276                    store: ComponentSizes {
277                        compressed: sizes.store.compressed,
278                        uncompressed: sizes.store.uncompressed,
279                        entries: sizes.store.entries,
280                    },
281                };
282                run.status = status;
283                run.duration_secs = duration_secs;
284                run.last_written_at = Local::now().fixed_offset();
285                return true;
286            }
287        }
288        false
289    }
290
291    /// Prunes runs according to the given retention policy.
292    ///
293    /// This method:
294    /// 1. Determines which runs to delete based on the policy
295    /// 2. Deletes those run directories from disk
296    /// 3. Deletes any orphaned directories not tracked in runs.json.zst
297    /// 4. Updates the run list in memory and on disk
298    ///
299    /// The `kind` parameter indicates whether this is explicit pruning (from a
300    /// user command) or implicit pruning (automatic during recording). This
301    /// affects how errors are displayed.
302    ///
303    /// Returns the result of the pruning operation, including any errors that
304    /// occurred while deleting individual runs.
305    ///
306    /// Returns an error if writing is denied due to a format version mismatch.
307    pub fn prune(
308        &mut self,
309        policy: &RecordRetentionPolicy,
310        kind: PruneKind,
311    ) -> Result<PruneResult, RunStoreError> {
312        if let RunsJsonWritePermission::Denied {
313            file_version,
314            max_supported_version,
315        } = self.write_permission
316        {
317            return Err(RunStoreError::FormatVersionTooNew {
318                file_version,
319                max_supported_version,
320            });
321        }
322
323        let now = Utc::now();
324        let to_delete: HashSet<_> = policy
325            .compute_runs_to_delete(&self.runs, now)
326            .into_iter()
327            .collect();
328
329        let runs_dir = self.runs_dir();
330        let mut result = if to_delete.is_empty() {
331            PruneResult::default()
332        } else {
333            delete_runs(runs_dir, &mut self.runs, &to_delete)
334        };
335        result.kind = kind;
336
337        let known_runs: HashSet<_> = self.runs.iter().map(|r| r.run_id).collect();
338        delete_orphaned_dirs(self.runs_dir, &known_runs, &mut result);
339
340        if result.deleted_count > 0 || result.orphans_deleted > 0 {
341            // Update last_pruned_at since we performed pruning.
342            self.last_pruned_at = Some(now);
343            write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
344        }
345
346        Ok(result)
347    }
348
349    /// Prunes runs if needed, based on time since last prune and limit thresholds.
350    ///
351    /// This method implements implicit pruning, which occurs:
352    /// - If more than 1 day has passed since the last prune, OR
353    /// - If any retention limit is exceeded by 1.5x.
354    ///
355    /// Use [`Self::prune`] for explicit pruning that always runs regardless of these conditions.
356    ///
357    /// Returns `Ok(None)` if pruning was skipped, `Ok(Some(result))` if pruning occurred.
358    pub fn prune_if_needed(
359        &mut self,
360        policy: &RecordRetentionPolicy,
361    ) -> Result<Option<PruneResult>, RunStoreError> {
362        const PRUNE_INTERVAL: TimeDelta = match TimeDelta::try_days(1) {
363            Some(d) => d,
364            None => panic!("1 day should always be a valid TimeDelta"),
365        };
366        const LIMIT_EXCEEDED_FACTOR: f64 = 1.5;
367
368        let now = Utc::now();
369
370        // Check if pruning is needed.
371        let time_since_last_prune = self
372            .last_pruned_at
373            .map(|last| now.signed_duration_since(last))
374            .unwrap_or(TimeDelta::MAX);
375
376        let should_prune = time_since_last_prune >= PRUNE_INTERVAL
377            || policy.limits_exceeded_by_factor(&self.runs, LIMIT_EXCEEDED_FACTOR);
378
379        if should_prune {
380            Ok(Some(self.prune(policy, PruneKind::Implicit)?))
381        } else {
382            Ok(None)
383        }
384    }
385
386    /// Creates a run recorder for a new run.
387    ///
388    /// Adds the run to the list and creates its directory. Consumes self,
389    /// dropping the exclusive lock.
390    ///
391    /// `max_output_size` specifies the maximum size of a single output (stdout/stderr)
392    /// before truncation.
393    ///
394    /// Returns the recorder and the shortest unique prefix for the run ID (for
395    /// display purposes), or an error if writing is denied due to a format
396    /// version mismatch.
397    #[expect(clippy::too_many_arguments)]
398    pub(crate) fn create_run_recorder(
399        mut self,
400        run_id: ReportUuid,
401        nextest_version: Version,
402        started_at: DateTime<FixedOffset>,
403        cli_args: Vec<String>,
404        build_scope_args: Vec<String>,
405        env_vars: BTreeMap<String, String>,
406        max_output_size: bytesize::ByteSize,
407        parent_run_id: Option<ReportUuid>,
408    ) -> Result<(RunRecorder, ShortestRunIdPrefix), RunStoreError> {
409        if let RunsJsonWritePermission::Denied {
410            file_version,
411            max_supported_version,
412        } = self.write_permission
413        {
414            return Err(RunStoreError::FormatVersionTooNew {
415                file_version,
416                max_supported_version,
417            });
418        }
419
420        // Add to the list of runs before creating the directory. This ensures
421        // that if creation fails, an empty run directory isn't left behind. (It
422        // does mean that there may be spurious entries in the list of runs,
423        // which will be dealt with during pruning.)
424
425        let now = Local::now().fixed_offset();
426        let run = RecordedRunInfo {
427            run_id,
428            store_format_version: STORE_FORMAT_VERSION,
429            nextest_version,
430            started_at,
431            last_written_at: now,
432            duration_secs: None,
433            cli_args,
434            build_scope_args,
435            env_vars,
436            parent_run_id,
437            sizes: RecordedSizes::default(),
438            status: RecordedRunStatus::Incomplete,
439        };
440        self.runs.push(run);
441
442        // If the parent run ID is set, update its last written at time.
443        if let Some(parent_run_id) = parent_run_id
444            && let Some(parent_run) = self.runs.iter_mut().find(|r| r.run_id == parent_run_id)
445        {
446            parent_run.last_written_at = now;
447        }
448
449        write_runs_json(self.runs_dir.as_path(), &self.runs, self.last_pruned_at)?;
450
451        // Compute the unique prefix now that the run is in the list.
452        let index = RunIdIndex::new(&self.runs);
453        let unique_prefix = index
454            .shortest_unique_prefix(run_id)
455            .expect("run was just added to the list");
456
457        // Create the run directory while still holding the lock. This prevents
458        // a race where another process could prune the newly-added run entry
459        // before the directory exists, leaving an orphaned directory. The lock
460        // is released when `self` is dropped.
461        let run_dir = self.runs_dir().run_dir(run_id);
462
463        let recorder = RunRecorder::new(run_dir, max_output_size)?;
464        Ok((recorder, unique_prefix))
465    }
466}
467
468/// Information about a recorded run.
469#[derive(Clone, Debug)]
470pub struct RecordedRunInfo {
471    /// The unique identifier for this run.
472    pub run_id: ReportUuid,
473    /// The format version of this run's store.zip archive.
474    ///
475    /// This allows checking replayability without opening the archive.
476    pub store_format_version: StoreFormatVersion,
477    /// The version of nextest that created this run.
478    pub nextest_version: Version,
479    /// When the run started.
480    pub started_at: DateTime<FixedOffset>,
481    /// When this run was last written to.
482    ///
483    /// Used for LRU eviction. Updated when the run is created, when the run
484    /// completes, and when a rerun references this run.
485    pub last_written_at: DateTime<FixedOffset>,
486    /// Duration of the run in seconds.
487    ///
488    /// This is `None` for incomplete runs.
489    pub duration_secs: Option<f64>,
490    /// The command-line arguments used to invoke nextest.
491    pub cli_args: Vec<String>,
492    /// Build scope arguments (package and target selection).
493    ///
494    /// These determine which packages and targets are built. In a rerun chain,
495    /// these are inherited from the original run unless explicitly overridden.
496    pub build_scope_args: Vec<String>,
497    /// Environment variables that affect nextest behavior (NEXTEST_* and CARGO_*).
498    pub env_vars: BTreeMap<String, String>,
499    /// If this is a rerun, the ID of the parent run.
500    ///
501    /// This forms a chain for iterative fix-and-rerun workflows.
502    pub parent_run_id: Option<ReportUuid>,
503    /// Sizes broken down by component (log and store).
504    pub sizes: RecordedSizes,
505    /// The status and statistics for this run.
506    pub status: RecordedRunStatus,
507}
508
509/// Sizes broken down by component (log and store).
510#[derive(Clone, Copy, Debug, Default)]
511pub struct RecordedSizes {
512    /// Sizes for the run log (run.log.zst).
513    pub log: ComponentSizes,
514    /// Sizes for the store archive (store.zip).
515    pub store: ComponentSizes,
516}
517
518/// Compressed and uncompressed sizes for a single component.
519#[derive(Clone, Copy, Debug, Default)]
520pub struct ComponentSizes {
521    /// Compressed size in bytes.
522    pub compressed: u64,
523    /// Uncompressed size in bytes.
524    pub uncompressed: u64,
525    /// Number of entries (records for log, files for store).
526    pub entries: u64,
527}
528
529impl RecordedSizes {
530    /// Returns the total compressed size (log + store).
531    pub fn total_compressed(&self) -> u64 {
532        self.log.compressed + self.store.compressed
533    }
534
535    /// Returns the total uncompressed size (log + store).
536    pub fn total_uncompressed(&self) -> u64 {
537        self.log.uncompressed + self.store.uncompressed
538    }
539
540    /// Returns the total number of entries (log records + store files).
541    pub fn total_entries(&self) -> u64 {
542        self.log.entries + self.store.entries
543    }
544}
545
546/// Status and statistics for a recorded run.
547#[derive(Clone, Debug)]
548pub enum RecordedRunStatus {
549    /// The run was interrupted before completion.
550    Incomplete,
551    /// A normal test run completed (all tests finished).
552    Completed(CompletedRunStats),
553    /// A normal test run was cancelled before all tests finished.
554    Cancelled(CompletedRunStats),
555    /// A stress test run completed (all iterations finished).
556    StressCompleted(StressCompletedRunStats),
557    /// A stress test run was cancelled before all iterations finished.
558    StressCancelled(StressCompletedRunStats),
559    /// An unknown status from a newer version of nextest.
560    ///
561    /// This variant is used for forward compatibility when reading runs.json.zst
562    /// files created by newer nextest versions that may have new status types.
563    Unknown,
564}
565
566impl RecordedRunStatus {
567    /// Returns a short status string for display.
568    pub fn short_status_str(&self) -> &'static str {
569        match self {
570            Self::Incomplete => "incomplete",
571            Self::Unknown => "unknown",
572            Self::Completed(_) => "completed",
573            Self::Cancelled(_) => "cancelled",
574            Self::StressCompleted(_) => "stress completed",
575            Self::StressCancelled(_) => "stress cancelled",
576        }
577    }
578
579    /// Returns the exit code for completed runs, or `None` for incomplete/unknown runs.
580    pub fn exit_code(&self) -> Option<i32> {
581        match self {
582            Self::Incomplete | Self::Unknown => None,
583            Self::Completed(stats) | Self::Cancelled(stats) => Some(stats.exit_code),
584            Self::StressCompleted(stats) | Self::StressCancelled(stats) => Some(stats.exit_code),
585        }
586    }
587}
588
589/// Statistics for a normal test run that finished (completed or cancelled).
590#[derive(Clone, Copy, Debug)]
591pub struct CompletedRunStats {
592    /// The number of tests that were expected to run.
593    pub initial_run_count: usize,
594    /// The number of tests that passed.
595    pub passed: usize,
596    /// The number of tests that failed (including exec failures and timeouts).
597    pub failed: usize,
598    /// The exit code from the run.
599    pub exit_code: i32,
600}
601
602/// Statistics for a stress test run that finished (completed or cancelled).
603#[derive(Clone, Copy, Debug)]
604pub struct StressCompletedRunStats {
605    /// The number of stress iterations that were expected to run, if known.
606    ///
607    /// This is `None` when the stress test was run without a fixed iteration count
608    /// (e.g., `--stress-duration`).
609    pub initial_iteration_count: Option<NonZero<u32>>,
610    /// The number of stress iterations that succeeded.
611    pub success_count: u32,
612    /// The number of stress iterations that failed.
613    pub failed_count: u32,
614    /// The exit code from the run.
615    pub exit_code: i32,
616}
617
618// ---
619// Replayability checking
620// ---
621
622/// The result of checking whether a run can be replayed.
623#[derive(Clone, Debug)]
624pub enum ReplayabilityStatus {
625    /// The run is definitely replayable.
626    ///
627    /// No blocking reasons and no uncertain conditions.
628    Replayable,
629    /// The run is definitely not replayable.
630    ///
631    /// Contains at least one blocking reason.
632    NotReplayable(Vec<NonReplayableReason>),
633    /// The run might be replayable but is incomplete.
634    ///
635    /// The archive might be usable, but we'd need to open `store.zip` to
636    /// verify all expected files are present.
637    Incomplete,
638}
639
640/// A definite reason why a run cannot be replayed.
641#[derive(Clone, Debug, PartialEq, Eq)]
642pub enum NonReplayableReason {
643    /// The run's store format version is incompatible with this nextest version.
644    ///
645    /// This nextest version cannot read the archive format.
646    StoreVersionIncompatible {
647        /// The specific incompatibility.
648        incompatibility: StoreVersionIncompatibility,
649    },
650    /// The `store.zip` file is missing from the run directory.
651    MissingStoreZip,
652    /// The `run.log.zst` file is missing from the run directory.
653    MissingRunLog,
654    /// The run status is `Unknown` (from a newer nextest version).
655    ///
656    /// We cannot safely replay since we don't understand the run's state.
657    StatusUnknown,
658}
659
660impl fmt::Display for NonReplayableReason {
661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662        match self {
663            Self::StoreVersionIncompatible { incompatibility } => {
664                write!(f, "store format version incompatible: {}", incompatibility)
665            }
666            Self::MissingStoreZip => {
667                write!(f, "store.zip is missing")
668            }
669            Self::MissingRunLog => {
670                write!(f, "run.log.zst is missing")
671            }
672            Self::StatusUnknown => {
673                write!(f, "run status is unknown (from a newer nextest version)")
674            }
675        }
676    }
677}
678
679/// Result of looking up a run by selector.
680#[derive(Clone, Copy, Debug)]
681pub struct ResolveRunIdResult {
682    /// The run ID.
683    pub run_id: ReportUuid,
684}
685
686impl RecordedRunStatus {
687    /// Returns the width (in decimal digits) needed to display the "passed" count.
688    ///
689    /// For non-completed runs (Incomplete, Unknown), returns 0 since they don't
690    /// display a passed count.
691    pub fn passed_count_width(&self) -> usize {
692        match self {
693            Self::Incomplete | Self::Unknown => 0,
694            Self::Completed(stats) | Self::Cancelled(stats) => {
695                usize_decimal_char_width(stats.passed)
696            }
697            Self::StressCompleted(stats) | Self::StressCancelled(stats) => {
698                // Stress tests use u32, convert to usize for width calculation.
699                u32_decimal_char_width(stats.success_count)
700            }
701        }
702    }
703}
704
705impl RecordedRunInfo {
706    /// Checks whether this run can be replayed.
707    ///
708    /// This performs a comprehensive check of all conditions that might prevent
709    /// replay, including:
710    /// - Store format version compatibility
711    /// - Presence of required files (store.zip, run.log.zst)
712    /// - Run status (unknown, incomplete)
713    ///
714    /// The `files` parameter is used to check for file existence. Use
715    /// [`StoreRunsDir::run_files`] for runs in the store, or pass in a
716    /// [`PortableRecording`](super::PortableRecording) directly.
717    pub fn check_replayability(&self, files: &dyn RunFilesExist) -> ReplayabilityStatus {
718        let mut blocking = Vec::new();
719        let mut is_incomplete = false;
720
721        // Check store format version compatibility.
722        if let Err(incompatibility) = self
723            .store_format_version
724            .check_readable_by(STORE_FORMAT_VERSION)
725        {
726            blocking.push(NonReplayableReason::StoreVersionIncompatible { incompatibility });
727        }
728
729        // Check for required files.
730        if !files.store_zip_exists() {
731            blocking.push(NonReplayableReason::MissingStoreZip);
732        }
733        if !files.run_log_exists() {
734            blocking.push(NonReplayableReason::MissingRunLog);
735        }
736
737        // Check run status.
738        match &self.status {
739            RecordedRunStatus::Unknown => {
740                blocking.push(NonReplayableReason::StatusUnknown);
741            }
742            RecordedRunStatus::Incomplete => {
743                is_incomplete = true;
744            }
745            RecordedRunStatus::Completed(_)
746            | RecordedRunStatus::Cancelled(_)
747            | RecordedRunStatus::StressCompleted(_)
748            | RecordedRunStatus::StressCancelled(_) => {
749                // These statuses are fine for replay.
750            }
751        }
752
753        // Return the appropriate variant based on what we found.
754        if !blocking.is_empty() {
755            ReplayabilityStatus::NotReplayable(blocking)
756        } else if is_incomplete {
757            ReplayabilityStatus::Incomplete
758        } else {
759            ReplayabilityStatus::Replayable
760        }
761    }
762
763    /// Returns a display wrapper for this run.
764    ///
765    /// The `run_id_index` is used for computing shortest unique prefixes,
766    /// which are highlighted differently in the output (similar to jj).
767    ///
768    /// The `alignment` parameter controls column alignment when displaying a
769    /// list of runs. Use [`RunListAlignment::from_runs`] to precompute
770    /// alignment for a set of runs.
771    ///
772    /// The `redactor` parameter, if provided, redacts timestamps, durations,
773    /// and sizes for snapshot testing while preserving column alignment.
774    pub fn display<'a>(
775        &'a self,
776        run_id_index: &'a RunIdIndex,
777        replayability: &'a ReplayabilityStatus,
778        alignment: RunListAlignment,
779        styles: &'a Styles,
780        redactor: &'a Redactor,
781    ) -> DisplayRecordedRunInfo<'a> {
782        DisplayRecordedRunInfo::new(
783            self,
784            run_id_index,
785            replayability,
786            alignment,
787            styles,
788            redactor,
789        )
790    }
791
792    /// Returns a detailed display wrapper for this run.
793    ///
794    /// Unlike [`Self::display`] which shows a compact table row, this provides
795    /// a multi-line detailed view suitable for the `store info` command.
796    ///
797    /// The `replayability` parameter should be computed by the caller using
798    /// [`Self::check_replayability`].
799    ///
800    /// The `now` parameter is the current time, used to compute relative
801    /// durations (e.g. "30s ago").
802    ///
803    /// The `redactor` parameter redacts paths, timestamps, durations, and sizes
804    /// for snapshot testing. Use `Redactor::noop()` if no redaction is needed.
805    pub fn display_detailed<'a>(
806        &'a self,
807        run_id_index: &'a RunIdIndex,
808        replayability: &'a ReplayabilityStatus,
809        now: DateTime<Utc>,
810        styles: &'a Styles,
811        theme_characters: &'a ThemeCharacters,
812        redactor: &'a Redactor,
813    ) -> DisplayRecordedRunInfoDetailed<'a> {
814        DisplayRecordedRunInfoDetailed::new(
815            self,
816            run_id_index,
817            replayability,
818            now,
819            styles,
820            theme_characters,
821            redactor,
822        )
823    }
824}
825
826/// Result of reading runs.json.zst.
827struct ReadRunsJsonResult {
828    runs: Vec<RecordedRunInfo>,
829    last_pruned_at: Option<DateTime<Utc>>,
830    write_permission: RunsJsonWritePermission,
831}
832
833/// Reads and deserializes `runs.json.zst`, converting to the internal
834/// representation.
835fn read_runs_json(runs_dir: &Utf8Path) -> Result<ReadRunsJsonResult, RunStoreError> {
836    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
837    let file = match File::open(&runs_json_path) {
838        Ok(file) => file,
839        Err(error) => {
840            if error.kind() == io::ErrorKind::NotFound {
841                // The file doesn't exist yet, so we can write a new one.
842                return Ok(ReadRunsJsonResult {
843                    runs: Vec::new(),
844                    last_pruned_at: None,
845                    write_permission: RunsJsonWritePermission::Allowed,
846                });
847            } else {
848                return Err(RunStoreError::RunListRead {
849                    path: runs_json_path,
850                    error,
851                });
852            }
853        }
854    };
855
856    let decoder = zstd::stream::Decoder::new(file).map_err(|error| RunStoreError::RunListRead {
857        path: runs_json_path.clone(),
858        error,
859    })?;
860
861    let list: RecordedRunList =
862        serde_json::from_reader(decoder).map_err(|error| RunStoreError::RunListDeserialize {
863            path: runs_json_path,
864            error,
865        })?;
866    let write_permission = list.write_permission();
867    let data = list.into_data();
868    Ok(ReadRunsJsonResult {
869        runs: data.runs,
870        last_pruned_at: data.last_pruned_at,
871        write_permission,
872    })
873}
874
875/// Serializes and writes runs.json.zst from internal representation.
876fn write_runs_json(
877    runs_dir: &Utf8Path,
878    runs: &[RecordedRunInfo],
879    last_pruned_at: Option<DateTime<Utc>>,
880) -> Result<(), RunStoreError> {
881    let runs_json_path = runs_dir.join(RUNS_JSON_FILE_NAME);
882    let list = RecordedRunList::from_data(runs, last_pruned_at);
883
884    atomicwrites::AtomicFile::new(&runs_json_path, atomicwrites::AllowOverwrite)
885        .write(|file| {
886            // Use compression level 3, consistent with other zstd usage in the crate.
887            let mut encoder = zstd::stream::Encoder::new(file, 3)?;
888            serde_json::to_writer_pretty(&mut encoder, &list)?;
889            encoder.finish()?;
890            Ok(())
891        })
892        .map_err(|error| RunStoreError::RunListWrite {
893            path: runs_json_path,
894            error,
895        })?;
896
897    Ok(())
898}
899
900/// A run store that has been locked for shared (read-only) access.
901///
902/// Multiple readers can hold this lock simultaneously, but it is exclusive
903/// with the exclusive lock used for writing.
904#[derive(Debug)]
905pub struct SharedLockedRunStore<'store> {
906    runs_dir: StoreRunsDir<'store>,
907    #[expect(dead_code, reason = "held for lock duration")]
908    locked_file: DebugIgnore<File>,
909    runs: Vec<RecordedRunInfo>,
910    write_permission: RunsJsonWritePermission,
911    run_id_index: RunIdIndex,
912}
913
914impl<'store> SharedLockedRunStore<'store> {
915    /// Returns a snapshot of the runs data, consuming self and releasing the
916    /// lock.
917    pub fn into_snapshot(self) -> RunStoreSnapshot {
918        RunStoreSnapshot {
919            runs_dir: self.runs_dir.as_path().to_owned(),
920            runs: self.runs,
921            write_permission: self.write_permission,
922            run_id_index: self.run_id_index,
923        }
924    }
925}
926
927/// A snapshot of run store data.
928#[derive(Debug)]
929pub struct RunStoreSnapshot {
930    runs_dir: Utf8PathBuf,
931    runs: Vec<RecordedRunInfo>,
932    write_permission: RunsJsonWritePermission,
933    run_id_index: RunIdIndex,
934}
935
936impl RunStoreSnapshot {
937    /// Returns the runs directory.
938    pub fn runs_dir(&self) -> StoreRunsDir<'_> {
939        StoreRunsDir(&self.runs_dir)
940    }
941
942    /// Returns whether this nextest can write to the runs.json.zst file.
943    ///
944    /// If the file has a newer format version than we support, writing is denied.
945    pub fn write_permission(&self) -> RunsJsonWritePermission {
946        self.write_permission
947    }
948
949    /// Returns a list of recorded runs.
950    pub fn runs(&self) -> &[RecordedRunInfo] {
951        &self.runs
952    }
953
954    /// Returns the number of recorded runs.
955    pub fn run_count(&self) -> usize {
956        self.runs.len()
957    }
958
959    /// Returns the total compressed size of all recorded runs in bytes.
960    pub fn total_size(&self) -> u64 {
961        self.runs.iter().map(|r| r.sizes.total_compressed()).sum()
962    }
963
964    /// Resolves a run ID selector to a run result.
965    ///
966    /// For [`RunIdSelector::Latest`], returns the most recent run by start
967    /// time.
968    /// For [`RunIdSelector::Prefix`], resolves the prefix to a specific run.
969    pub fn resolve_run_id(
970        &self,
971        selector: &RunIdSelector,
972    ) -> Result<ResolveRunIdResult, RunIdResolutionError> {
973        match selector {
974            RunIdSelector::Latest => self.most_recent_run(),
975            RunIdSelector::Prefix(prefix) => {
976                let run_id = self.resolve_run_id_prefix(prefix)?;
977                Ok(ResolveRunIdResult { run_id })
978            }
979        }
980    }
981
982    /// Resolves a run ID prefix to a full UUID.
983    ///
984    /// The prefix must be a valid hexadecimal string. If the prefix matches
985    /// exactly one run, that run's UUID is returned. Otherwise, an error is
986    /// returned indicating whether no runs matched or multiple runs matched.
987    fn resolve_run_id_prefix(&self, prefix: &str) -> Result<ReportUuid, RunIdResolutionError> {
988        self.run_id_index.resolve_prefix(prefix).map_err(|err| {
989            match err {
990                PrefixResolutionError::NotFound => RunIdResolutionError::NotFound {
991                    prefix: prefix.to_string(),
992                },
993                PrefixResolutionError::Ambiguous { count, candidates } => {
994                    // Convert UUIDs to full RecordedRunInfo and sort by start time (most recent first).
995                    let mut candidates: Vec<_> = candidates
996                        .into_iter()
997                        .filter_map(|run_id| self.get_run(run_id).cloned())
998                        .collect();
999                    candidates.sort_by(|a, b| b.started_at.cmp(&a.started_at));
1000                    RunIdResolutionError::Ambiguous {
1001                        prefix: prefix.to_string(),
1002                        count,
1003                        candidates,
1004                        run_id_index: self.run_id_index.clone(),
1005                    }
1006                }
1007                PrefixResolutionError::InvalidPrefix => RunIdResolutionError::InvalidPrefix {
1008                    prefix: prefix.to_string(),
1009                },
1010            }
1011        })
1012    }
1013
1014    /// Returns the run ID index for computing shortest unique prefixes.
1015    pub fn run_id_index(&self) -> &RunIdIndex {
1016        &self.run_id_index
1017    }
1018
1019    /// Looks up a run by its exact UUID.
1020    pub fn get_run(&self, run_id: ReportUuid) -> Option<&RecordedRunInfo> {
1021        self.runs.iter().find(|r| r.run_id == run_id)
1022    }
1023
1024    /// Returns the most recent run by start time.
1025    ///
1026    /// Returns an error if there are no runs at all.
1027    pub fn most_recent_run(&self) -> Result<ResolveRunIdResult, RunIdResolutionError> {
1028        self.runs
1029            .iter()
1030            .max_by_key(|r| r.started_at)
1031            .map(|r| ResolveRunIdResult { run_id: r.run_id })
1032            .ok_or(RunIdResolutionError::NoRuns)
1033    }
1034
1035    /// Computes which runs would be deleted by a prune operation.
1036    ///
1037    /// This is used for dry-run mode to show what would be deleted without
1038    /// actually deleting anything. Returns a [`PrunePlan`] containing the runs
1039    /// that would be deleted, sorted by start time (oldest first).
1040    pub fn compute_prune_plan(&self, policy: &RecordRetentionPolicy) -> PrunePlan {
1041        PrunePlan::compute(&self.runs, policy)
1042    }
1043}
1044
1045/// A snapshot paired with precomputed replayability status for all runs.
1046///
1047/// This struct maintains the invariant that every run in the snapshot has a
1048/// corresponding entry in the replayability map. Use [`Self::new`] to compute
1049/// replayability for all runs, or `Self::new_for_test` for testing.
1050#[derive(Debug)]
1051pub struct SnapshotWithReplayability<'a> {
1052    snapshot: &'a RunStoreSnapshot,
1053    replayability: HashMap<ReportUuid, ReplayabilityStatus>,
1054    latest_run_id: Option<ReportUuid>,
1055}
1056
1057impl<'a> SnapshotWithReplayability<'a> {
1058    /// Creates a new snapshot with replayability by checking all runs.
1059    ///
1060    /// This computes [`ReplayabilityStatus`] for each run by checking file
1061    /// existence and format versions.
1062    pub fn new(snapshot: &'a RunStoreSnapshot) -> Self {
1063        let runs_dir = snapshot.runs_dir();
1064        let replayability: HashMap<_, _> = snapshot
1065            .runs()
1066            .iter()
1067            .map(|run| {
1068                (
1069                    run.run_id,
1070                    run.check_replayability(&runs_dir.run_files(run.run_id)),
1071                )
1072            })
1073            .collect();
1074
1075        // Find the latest run by time.
1076        let latest_run_id = snapshot.most_recent_run().ok().map(|r| r.run_id);
1077
1078        Self {
1079            snapshot,
1080            replayability,
1081            latest_run_id,
1082        }
1083    }
1084
1085    /// Returns a reference to the underlying snapshot.
1086    pub fn snapshot(&self) -> &'a RunStoreSnapshot {
1087        self.snapshot
1088    }
1089
1090    /// Returns the replayability map.
1091    pub fn replayability(&self) -> &HashMap<ReportUuid, ReplayabilityStatus> {
1092        &self.replayability
1093    }
1094
1095    /// Returns the replayability status for a specific run.
1096    ///
1097    /// # Panics
1098    ///
1099    /// Panics if the run ID is not in the snapshot. This maintains the
1100    /// invariant that all runs in the snapshot have replayability computed.
1101    pub fn get_replayability(&self, run_id: ReportUuid) -> &ReplayabilityStatus {
1102        self.replayability
1103            .get(&run_id)
1104            .expect("run ID should be in replayability map")
1105    }
1106
1107    /// Returns the ID of the most recent run by start time, if any.
1108    pub fn latest_run_id(&self) -> Option<ReportUuid> {
1109        self.latest_run_id
1110    }
1111}
1112
1113#[cfg(test)]
1114impl SnapshotWithReplayability<'_> {
1115    /// Creates a snapshot with replayability for testing.
1116    ///
1117    /// All runs are marked as [`ReplayabilityStatus::Replayable`] by default.
1118    pub fn new_for_test(snapshot: &RunStoreSnapshot) -> SnapshotWithReplayability<'_> {
1119        let replayability: HashMap<_, _> = snapshot
1120            .runs()
1121            .iter()
1122            .map(|run| (run.run_id, ReplayabilityStatus::Replayable))
1123            .collect();
1124
1125        // For tests, latest is just the most recent by time.
1126        let latest_run_id = snapshot
1127            .runs()
1128            .iter()
1129            .max_by_key(|r| r.started_at)
1130            .map(|r| r.run_id);
1131
1132        SnapshotWithReplayability {
1133            snapshot,
1134            replayability,
1135            latest_run_id,
1136        }
1137    }
1138}
1139
1140#[cfg(test)]
1141impl RunStoreSnapshot {
1142    /// Creates a new snapshot for testing.
1143    pub(crate) fn new_for_test(runs: Vec<RecordedRunInfo>) -> Self {
1144        use super::run_id_index::RunIdIndex;
1145
1146        let run_id_index = RunIdIndex::new(&runs);
1147        Self {
1148            runs_dir: Utf8PathBuf::from("/test/runs"),
1149            runs,
1150            write_permission: RunsJsonWritePermission::Allowed,
1151            run_id_index,
1152        }
1153    }
1154}
1155
1156/// The kind of lock to acquire.
1157#[derive(Clone, Copy)]
1158enum LockKind {
1159    Shared,
1160    Exclusive,
1161}
1162
1163/// Acquires a file lock with retries, timing out after 5 seconds.
1164///
1165/// This handles both brief contention (another nextest process finishing up)
1166/// and filesystems where locking may not work properly (e.g., NFS).
1167fn acquire_lock_with_retry(
1168    file: &File,
1169    lock_file_path: &Utf8Path,
1170    kind: LockKind,
1171) -> Result<(), RunStoreError> {
1172    const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
1173    const LOCK_RETRY_INTERVAL: Duration = Duration::from_millis(100);
1174
1175    let start = Instant::now();
1176    loop {
1177        let result = match kind {
1178            LockKind::Shared => file.try_lock_shared(),
1179            LockKind::Exclusive => file.try_lock(),
1180        };
1181
1182        match result {
1183            Ok(()) => return Ok(()),
1184            Err(TryLockError::WouldBlock) => {
1185                // Lock is held by another process. Retry if we haven't timed out.
1186                if start.elapsed() >= LOCK_TIMEOUT {
1187                    return Err(RunStoreError::FileLockTimeout {
1188                        path: lock_file_path.to_owned(),
1189                        timeout_secs: LOCK_TIMEOUT.as_secs(),
1190                    });
1191                }
1192                thread::sleep(LOCK_RETRY_INTERVAL);
1193            }
1194            Err(TryLockError::Error(error)) => {
1195                // Some other error (e.g., locking not supported on this filesystem).
1196                return Err(RunStoreError::FileLock {
1197                    path: lock_file_path.to_owned(),
1198                    error,
1199                });
1200            }
1201        }
1202    }
1203}