Skip to main content

sourcery_core/
concurrency.rs

1//! Compile-time concurrency strategy selection.
2//!
3//! This module provides marker types for choosing between optimistic
4//! (version-checked) and unchecked (last-writer-wins) concurrency control at
5//! the type level.
6//!
7//! # Example
8//!
9//! ```ignore
10//! // Default: optimistic concurrency (safe)
11//! let repo = Repository::new(store);
12//!
13//! // Opt-out for single-writer scenarios
14//! let repo = Repository::new(store).without_concurrency_checking();
15//! ```
16
17use std::fmt;
18
19use thiserror::Error;
20
21/// No version checking - last writer wins.
22///
23/// Events are appended without checking whether other events were added
24/// since loading. Suitable for single-writer scenarios or when conflicts
25/// are acceptable.
26#[derive(Debug, Clone, Copy, Default)]
27pub struct Unchecked;
28
29/// Optimistic concurrency control - version checked on every write.
30///
31/// This is the default concurrency strategy for
32/// [`Repository`](crate::Repository). With this strategy, the repository tracks
33/// the stream version when loading an aggregate and verifies it hasn't changed
34/// before appending new events. If the version changed (another writer appended
35/// events), the operation fails with a [`ConcurrencyConflict`] error.
36#[derive(Debug, Clone, Copy, Default)]
37pub struct Optimistic;
38
39/// Sealed trait for concurrency strategy markers.
40///
41/// This trait cannot be implemented outside this crate, ensuring only
42/// [`Unchecked`] and [`Optimistic`] can be used as concurrency strategies.
43pub trait ConcurrencyStrategy: private::Sealed + Default + Send + Sync {
44    /// Whether this strategy checks versions before appending.
45    const CHECK_VERSION: bool;
46}
47
48impl ConcurrencyStrategy for Unchecked {
49    const CHECK_VERSION: bool = false;
50}
51
52impl ConcurrencyStrategy for Optimistic {
53    const CHECK_VERSION: bool = true;
54}
55
56mod private {
57    pub trait Sealed {}
58    impl Sealed for super::Unchecked {}
59    impl Sealed for super::Optimistic {}
60}
61
62/// Error indicating a concurrency conflict during append.
63///
64/// This error is returned when using [`Optimistic`] concurrency and another
65/// writer has appended events to the stream since we loaded the aggregate.
66#[derive(Debug, Clone, PartialEq, Eq, Error)]
67#[error("{}", format_conflict(.expected.as_ref(), .actual.as_ref()))]
68pub struct ConcurrencyConflict<Pos: fmt::Debug> {
69    /// The version we expected (from when we loaded the aggregate).
70    /// `None` indicates we expected a new/empty stream.
71    pub expected: Option<Pos>,
72    /// The actual current version in the store.
73    /// `None` indicates the stream is empty (which shouldn't happen in a
74    /// conflict).
75    pub actual: Option<Pos>,
76}
77
78fn format_conflict<Pos: fmt::Debug>(expected: Option<&Pos>, actual: Option<&Pos>) -> String {
79    match (expected, actual) {
80        (None, Some(actual)) => {
81            format!(
82                "concurrency conflict: expected new stream, found version {actual:?} (hint: \
83                 another process created this aggregate; reload and retry)"
84            )
85        }
86        (Some(expected), actual) => {
87            format!(
88                "concurrency conflict: expected version {expected:?}, found {actual:?} (hint: \
89                 stream was modified; reload and retry)"
90            )
91        }
92        (None, None) => "concurrency conflict: unexpected empty state".to_string(),
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn conflict_expected_new_stream_mentions_hint() {
102        let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
103            expected: None,
104            actual: Some(42),
105        };
106        let msg = conflict.to_string();
107        assert!(msg.contains("expected new stream"));
108        assert!(msg.contains("reload and retry"));
109    }
110
111    #[test]
112    fn conflict_expected_version_includes_versions() {
113        let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
114            expected: Some(5),
115            actual: Some(10),
116        };
117        let msg = conflict.to_string();
118        assert!(msg.contains("expected version"));
119        assert!(msg.contains('5'));
120        assert!(msg.contains("10"));
121    }
122
123    #[test]
124    fn conflict_unexpected_empty_state_formats() {
125        let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
126            expected: None,
127            actual: None,
128        };
129        let msg = conflict.to_string();
130        assert!(msg.contains("unexpected empty state"));
131    }
132}