sourcery_core/
concurrency.rs1use std::fmt;
18
19use thiserror::Error;
20
21#[derive(Debug, Clone, Copy, Default)]
27pub struct Unchecked;
28
29#[derive(Debug, Clone, Copy, Default)]
37pub struct Optimistic;
38
39pub trait ConcurrencyStrategy: private::Sealed + Default + Send + Sync {
44 const CHECK_VERSION: bool;
46}
47
48impl ConcurrencyStrategy for Unchecked {
49 const CHECK_VERSION: bool = false;
50}
51
52impl ConcurrencyStrategy for Optimistic {
53 const CHECK_VERSION: bool = true;
54}
55
56mod private {
57 pub trait Sealed {}
58 impl Sealed for super::Unchecked {}
59 impl Sealed for super::Optimistic {}
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Error)]
67#[error("{}", format_conflict(.expected.as_ref(), .actual.as_ref()))]
68pub struct ConcurrencyConflict<Pos: fmt::Debug> {
69 pub expected: Option<Pos>,
72 pub actual: Option<Pos>,
76}
77
78fn format_conflict<Pos: fmt::Debug>(expected: Option<&Pos>, actual: Option<&Pos>) -> String {
79 match (expected, actual) {
80 (None, Some(actual)) => {
81 format!(
82 "concurrency conflict: expected new stream, found version {actual:?} (hint: \
83 another process created this aggregate; reload and retry)"
84 )
85 }
86 (Some(expected), actual) => {
87 format!(
88 "concurrency conflict: expected version {expected:?}, found {actual:?} (hint: \
89 stream was modified; reload and retry)"
90 )
91 }
92 (None, None) => "concurrency conflict: unexpected empty state".to_string(),
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn conflict_expected_new_stream_mentions_hint() {
102 let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
103 expected: None,
104 actual: Some(42),
105 };
106 let msg = conflict.to_string();
107 assert!(msg.contains("expected new stream"));
108 assert!(msg.contains("reload and retry"));
109 }
110
111 #[test]
112 fn conflict_expected_version_includes_versions() {
113 let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
114 expected: Some(5),
115 actual: Some(10),
116 };
117 let msg = conflict.to_string();
118 assert!(msg.contains("expected version"));
119 assert!(msg.contains('5'));
120 assert!(msg.contains("10"));
121 }
122
123 #[test]
124 fn conflict_unexpected_empty_state_formats() {
125 let conflict: ConcurrencyConflict<u64> = ConcurrencyConflict {
126 expected: None,
127 actual: None,
128 };
129 let msg = conflict.to_string();
130 assert!(msg.contains("unexpected empty state"));
131 }
132}